suryanshp1 commited on
Commit
edea0e8
·
verified ·
1 Parent(s): 12aad6c

Upload app.py

Browse files
Files changed (1) hide show
  1. app.py +423 -0
app.py ADDED
@@ -0,0 +1,423 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ """
3
+ CVE Details MCP Server with Gradio Interface
4
+ A Model Context Protocol server that provides CVE vulnerability details through a web interface.
5
+ """
6
+
7
+ import asyncio
8
+ import logging
9
+ from typing import Any, Dict, Optional
10
+ from urllib.parse import quote
11
+ import aiohttp
12
+ import gradio as gr
13
+ import ssl
14
+ import certifi
15
+
16
+
17
+
18
+ # Configure logging
19
+ logging.basicConfig(level=logging.INFO)
20
+ logger = logging.getLogger(__name__)
21
+
22
+ class CVEDataFetcher:
23
+ def __init__(self):
24
+ self.session: Optional[aiohttp.ClientSession] = None
25
+
26
+ async def __aenter__(self):
27
+ ssl_context = ssl.create_default_context(cafile=certifi.where())
28
+ connector = aiohttp.TCPConnector(ssl=ssl_context)
29
+ self.session = aiohttp.ClientSession(connector=connector)
30
+ return self
31
+
32
+ async def __aexit__(self, exc_type, exc_val, exc_tb):
33
+ if self.session:
34
+ await self.session.close()
35
+
36
+ async def get_cve_details(self, cve_id: str) -> Dict[str, Any]:
37
+ """Fetch detailed CVE information from NVD API."""
38
+ if not cve_id or not cve_id.upper().startswith('CVE-'):
39
+ raise ValueError("Invalid CVE ID format. Expected format: CVE-YYYY-NNNN")
40
+
41
+ cve_id = cve_id.upper()
42
+ url = f"https://services.nvd.nist.gov/rest/json/cves/2.0?cveId={cve_id}"
43
+
44
+ async with self.session.get(url) as response:
45
+ if response.status != 200:
46
+ raise Exception(f"Failed to fetch CVE data: HTTP {response.status}")
47
+
48
+ data = await response.json()
49
+
50
+ if not data.get('vulnerabilities'):
51
+ raise Exception(f"CVE {cve_id} not found")
52
+
53
+ vuln = data['vulnerabilities'][0]['cve']
54
+
55
+ # Extract and format the vulnerability details
56
+ details = {
57
+ 'id': vuln['id'],
58
+ 'sourceIdentifier': vuln.get('sourceIdentifier', 'N/A'),
59
+ 'published': vuln.get('published', 'N/A'),
60
+ 'lastModified': vuln.get('lastModified', 'N/A'),
61
+ 'vulnStatus': vuln.get('vulnStatus', 'N/A'),
62
+ 'descriptions': [],
63
+ 'cvss_scores': {},
64
+ 'weaknesses': [],
65
+ 'configurations': [],
66
+ 'references': [],
67
+ 'vendor_comments': []
68
+ }
69
+
70
+ # Descriptions
71
+ for desc in vuln.get('descriptions', []):
72
+ if desc['lang'] == 'en':
73
+ details['descriptions'].append(desc['value'])
74
+
75
+ # CVSS Scores
76
+ metrics = vuln.get('metrics', {})
77
+ if 'cvssMetricV31' in metrics:
78
+ cvss31 = metrics['cvssMetricV31'][0]['cvssData']
79
+ details['cvss_scores']['v3.1'] = {
80
+ 'baseScore': cvss31.get('baseScore'),
81
+ 'baseSeverity': cvss31.get('baseSeverity'),
82
+ 'vectorString': cvss31.get('vectorString'),
83
+ 'attackVector': cvss31.get('attackVector'),
84
+ 'attackComplexity': cvss31.get('attackComplexity'),
85
+ 'privilegesRequired': cvss31.get('privilegesRequired'),
86
+ 'userInteraction': cvss31.get('userInteraction'),
87
+ 'scope': cvss31.get('scope'),
88
+ 'confidentialityImpact': cvss31.get('confidentialityImpact'),
89
+ 'integrityImpact': cvss31.get('integrityImpact'),
90
+ 'availabilityImpact': cvss31.get('availabilityImpact')
91
+ }
92
+
93
+ if 'cvssMetricV2' in metrics:
94
+ cvss2 = metrics['cvssMetricV2'][0]['cvssData']
95
+ details['cvss_scores']['v2.0'] = {
96
+ 'baseScore': cvss2.get('baseScore'),
97
+ 'vectorString': cvss2.get('vectorString'),
98
+ 'accessVector': cvss2.get('accessVector'),
99
+ 'accessComplexity': cvss2.get('accessComplexity'),
100
+ 'authentication': cvss2.get('authentication'),
101
+ 'confidentialityImpact': cvss2.get('confidentialityImpact'),
102
+ 'integrityImpact': cvss2.get('integrityImpact'),
103
+ 'availabilityImpact': cvss2.get('availabilityImpact')
104
+ }
105
+
106
+ # Weaknesses (CWE)
107
+ for weakness in vuln.get('weaknesses', []):
108
+ for desc in weakness.get('description', []):
109
+ if desc['lang'] == 'en':
110
+ details['weaknesses'].append({
111
+ 'type': weakness.get('type'),
112
+ 'cwe_id': desc.get('value'),
113
+ 'description': desc.get('value')
114
+ })
115
+
116
+ # Configurations (affected systems)
117
+ for config in vuln.get('configurations', []):
118
+ for node in config.get('nodes', []):
119
+ for cpe_match in node.get('cpeMatch', []):
120
+ details['configurations'].append({
121
+ 'criteria': cpe_match.get('criteria'),
122
+ 'vulnerable': cpe_match.get('vulnerable'),
123
+ 'versionStartIncluding': cpe_match.get('versionStartIncluding'),
124
+ 'versionEndExcluding': cpe_match.get('versionEndExcluding')
125
+ })
126
+
127
+ # References
128
+ for ref in vuln.get('references', []):
129
+ details['references'].append({
130
+ 'url': ref.get('url'),
131
+ 'source': ref.get('source'),
132
+ 'tags': ref.get('tags', [])
133
+ })
134
+
135
+ # Vendor comments
136
+ for comment in vuln.get('vendorComments', []):
137
+ details['vendor_comments'].append({
138
+ 'organization': comment.get('organization'),
139
+ 'comment': comment.get('comment'),
140
+ 'lastModified': comment.get('lastModified')
141
+ })
142
+
143
+ return details
144
+
145
+ async def search_cves(self, keyword: str, limit: int = 10) -> Dict[str, Any]:
146
+ """Search for CVEs by keyword."""
147
+ if not keyword:
148
+ raise ValueError("Keyword is required for search")
149
+
150
+ # URL encode the keyword
151
+ encoded_keyword = quote(keyword)
152
+ url = f"https://services.nvd.nist.gov/rest/json/cves/2.0?keywordSearch={encoded_keyword}&resultsPerPage={limit}"
153
+
154
+ async with self.session.get(url) as response:
155
+ if response.status != 200:
156
+ raise Exception(f"Failed to search CVEs: HTTP {response.status}")
157
+
158
+ data = await response.json()
159
+
160
+ results = {
161
+ 'total_results': data.get('totalResults', 0),
162
+ 'results_per_page': data.get('resultsPerPage', 0),
163
+ 'start_index': data.get('startIndex', 0),
164
+ 'cves': []
165
+ }
166
+
167
+ for vuln_data in data.get('vulnerabilities', []):
168
+ vuln = vuln_data['cve']
169
+ cve_summary = {
170
+ 'id': vuln['id'],
171
+ 'published': vuln.get('published', 'N/A'),
172
+ 'lastModified': vuln.get('lastModified', 'N/A'),
173
+ 'vulnStatus': vuln.get('vulnStatus', 'N/A'),
174
+ 'description': '',
175
+ 'cvss_score': None,
176
+ 'severity': None
177
+ }
178
+
179
+ # Get first English description
180
+ for desc in vuln.get('descriptions', []):
181
+ if desc['lang'] == 'en':
182
+ cve_summary['description'] = desc['value'][:200] + '...' if len(desc['value']) > 200 else desc['value']
183
+ break
184
+
185
+ # Get CVSS score
186
+ metrics = vuln.get('metrics', {})
187
+ if 'cvssMetricV31' in metrics:
188
+ cvss_data = metrics['cvssMetricV31'][0]['cvssData']
189
+ cve_summary['cvss_score'] = cvss_data.get('baseScore')
190
+ cve_summary['severity'] = cvss_data.get('baseSeverity')
191
+ elif 'cvssMetricV2' in metrics:
192
+ cvss_data = metrics['cvssMetricV2'][0]['cvssData']
193
+ cve_summary['cvss_score'] = cvss_data.get('baseScore')
194
+
195
+ results['cves'].append(cve_summary)
196
+
197
+ return results
198
+
199
+ # Global server instance
200
+ cve_fetcher = CVEDataFetcher()
201
+
202
+ def get_cve_details_sync(cve_id: str) -> str:
203
+ """Synchronous wrapper for getting CVE details."""
204
+ return asyncio.run(get_cve_details_gradio(cve_id))
205
+
206
+ def search_cves_sync(keyword: str, limit: int) -> str:
207
+ """Synchronous wrapper for searching CVEs."""
208
+ return asyncio.run(search_cves_gradio(keyword, limit))
209
+
210
+ async def get_cve_details_gradio(cve_id: str) -> str:
211
+ """Gradio interface function for getting CVE details."""
212
+
213
+ if not cve_id.strip():
214
+ return "Please enter a CVE ID (e.g., CVE-2023-1234)"
215
+
216
+ try:
217
+ async with CVEDataFetcher() as fetcher:
218
+ details = await fetcher.get_cve_details(cve_id.strip())
219
+
220
+ # Format the output for better readability
221
+ output = f"# {details['id']} Details\n\n"
222
+ output += f"**Status:** {details['vulnStatus']}\n"
223
+ output += f"**Published:** {details['published']}\n"
224
+ output += f"**Last Modified:** {details['lastModified']}\n"
225
+ output += f"**Source:** {details['sourceIdentifier']}\n\n"
226
+
227
+ # Description
228
+ if details['descriptions']:
229
+ output += "## Description\n"
230
+ for desc in details['descriptions']:
231
+ output += f"{desc}\n\n"
232
+
233
+ # CVSS Scores
234
+ if details['cvss_scores']:
235
+ output += "## CVSS Scores\n"
236
+ for version, scores in details['cvss_scores'].items():
237
+ output += f"### CVSS {version}\n"
238
+ if 'baseScore' in scores:
239
+ output += f"**Base Score:** {scores['baseScore']}\n"
240
+ if 'baseSeverity' in scores:
241
+ output += f"**Severity:** {scores['baseSeverity']}\n"
242
+ if 'vectorString' in scores:
243
+ output += f"**Vector:** {scores['vectorString']}\n"
244
+ output += "\n"
245
+
246
+ # Weaknesses
247
+ if details['weaknesses']:
248
+ output += "## Weaknesses (CWE)\n"
249
+ for weakness in details['weaknesses']:
250
+ output += f"- **{weakness['cwe_id']}:** {weakness['description']}\n"
251
+ output += "\n"
252
+
253
+ # Affected Configurations
254
+ if details['configurations']:
255
+ output += "## Affected Configurations\n"
256
+ for config in details['configurations'][:10]: # Limit to first 10
257
+ output += f"- {config['criteria']}\n"
258
+ if config.get('versionStartIncluding'):
259
+ output += f" - From version: {config['versionStartIncluding']}\n"
260
+ if config.get('versionEndExcluding'):
261
+ output += f" - Before version: {config['versionEndExcluding']}\n"
262
+ output += "\n"
263
+
264
+ # References
265
+ if details['references']:
266
+ output += "## References\n"
267
+ for ref in details['references'][:10]: # Limit to first 10
268
+ output += f"- [{ref['source']}]({ref['url']})\n"
269
+ if ref['tags']:
270
+ output += f" - Tags: {', '.join(ref['tags'])}\n"
271
+ output += "\n"
272
+
273
+ return output
274
+
275
+ except Exception as e:
276
+ return f"Error: {str(e)}"
277
+
278
+ async def search_cves_gradio(keyword: str, limit: int) -> str:
279
+ """Gradio interface function for searching CVEs."""
280
+ if not keyword.strip():
281
+ return "Please enter a search keyword"
282
+
283
+ try:
284
+ async with CVEDataFetcher() as fetcher:
285
+ results = await fetcher.search_cves(keyword.strip(), limit)
286
+
287
+ output = f"# CVE Search Results for '{keyword}'\n\n"
288
+ output += f"**Total Results:** {results['total_results']}\n"
289
+ output += f"**Showing:** {len(results['cves'])} results\n\n"
290
+
291
+ for cve in results['cves']:
292
+ output += f"## {cve['id']}\n"
293
+ output += f"**Published:** {cve['published']}\n"
294
+ output += f"**Status:** {cve['vulnStatus']}\n"
295
+ if cve['cvss_score']:
296
+ output += f"**CVSS Score:** {cve['cvss_score']}"
297
+ if cve['severity']:
298
+ output += f" ({cve['severity']})"
299
+ output += "\n"
300
+ output += f"**Description:** {cve['description']}\n\n"
301
+ output += "---\n\n"
302
+
303
+ return output
304
+
305
+ except Exception as e:
306
+ return f"Error: {str(e)}"
307
+
308
+ def create_gradio_interface():
309
+ """Create the Gradio web interface."""
310
+ with gr.Blocks(title="CVE Details MCP Server", theme=gr.themes.Soft()) as app:
311
+ gr.Markdown("# 🔒 CVE Details MCP Server")
312
+ gr.Markdown("Get comprehensive vulnerability details from the National Vulnerability Database")
313
+
314
+ with gr.Tabs():
315
+ # CVE Details Tab
316
+ with gr.Tab("CVE Details"):
317
+ gr.Markdown("Enter a CVE ID to get detailed vulnerability information")
318
+
319
+ with gr.Row():
320
+ cve_input = gr.Textbox(
321
+ label="CVE ID",
322
+ placeholder="e.g., CVE-2023-1234",
323
+ value="CVE-2023-44487"
324
+ )
325
+ get_details_btn = gr.Button("Get Details", variant="primary")
326
+
327
+ cve_output = gr.Markdown(label="CVE Details")
328
+
329
+ get_details_btn.click(
330
+ fn=get_cve_details_sync,
331
+ inputs=[cve_input],
332
+ outputs=[cve_output]
333
+ )
334
+
335
+ # CVE Search Tab
336
+ with gr.Tab("CVE Search"):
337
+ gr.Markdown("Search for CVEs by keyword, vendor, or product name")
338
+
339
+ with gr.Row():
340
+ search_input = gr.Textbox(
341
+ label="Search Keyword",
342
+ placeholder="e.g., apache, wordpress, buffer overflow"
343
+ )
344
+ limit_input = gr.Slider(
345
+ label="Max Results",
346
+ minimum=1,
347
+ maximum=50,
348
+ value=10,
349
+ step=1
350
+ )
351
+
352
+ search_btn = gr.Button("Search CVEs", variant="primary")
353
+ search_output = gr.Markdown(label="Search Results")
354
+
355
+ search_btn.click(
356
+ fn=search_cves_sync,
357
+ inputs=[search_input, limit_input],
358
+ outputs=[search_output]
359
+ )
360
+
361
+ # MCP Info Tab
362
+ with gr.Tab("MCP Server Info"):
363
+ gr.Markdown("""
364
+ ## Model Context Protocol (MCP) Server
365
+
366
+ This server implements the MCP specification and provides two main tools:
367
+
368
+ ### Available Tools:
369
+ 1. **get_cve_details**: Get comprehensive details for a specific CVE
370
+ 2. **search_cves**: Search for CVEs by keyword
371
+
372
+ ### Usage as MCP Server:
373
+ This interface automatically runs as an MCP server when launched.
374
+
375
+ ### Features:
376
+ - Real-time CVE data from NIST NVD
377
+ - CVSS v2.0 and v3.1 scoring
378
+ - CWE weakness classification
379
+ - Affected product configurations
380
+ - Reference links and vendor comments
381
+ - Comprehensive search functionality
382
+
383
+ ### Data Source:
384
+ [NIST National Vulnerability Database (NVD)](https://nvd.nist.gov/)
385
+ """)
386
+
387
+ return app
388
+
389
+ # Create the main CVE details interface
390
+ cve_details_demo = gr.Interface(
391
+ fn=get_cve_details_sync,
392
+ inputs=gr.Textbox(label="CVE ID", placeholder="e.g., CVE-2023-1234"),
393
+ outputs=gr.Markdown(label="CVE Details"),
394
+ title="CVE Details Lookup",
395
+ description="Enter a CVE ID to get comprehensive vulnerability information from the National Vulnerability Database."
396
+ )
397
+
398
+ # Create the CVE search interface
399
+ cve_search_demo = gr.Interface(
400
+ fn=search_cves_sync,
401
+ inputs=[
402
+ gr.Textbox(label="Search Keyword", placeholder="e.g., apache, wordpress, buffer overflow"),
403
+ gr.Slider(label="Max Results", minimum=1, maximum=50, value=10, step=1)
404
+ ],
405
+ outputs=gr.Markdown(label="Search Results"),
406
+ title="CVE Search",
407
+ description="Search for CVEs by keyword, vendor, or product name."
408
+ )
409
+
410
+ # Create tabbed interface combining both functions
411
+ demo = gr.TabbedInterface(
412
+ [cve_details_demo, cve_search_demo],
413
+ ["CVE Details", "CVE Search"],
414
+ title="🔒 CVE Details MCP Server"
415
+ )
416
+
417
+ demo.launch(
418
+ server_name="0.0.0.0",
419
+ server_port=7860,
420
+ share=False,
421
+ debug=False,
422
+ mcp_server=True
423
+ )