#!/usr/bin/env python3 """ CVE Details MCP Server with Gradio Interface A Model Context Protocol server that provides CVE vulnerability details through a web interface. """ import asyncio import logging from typing import Any, Dict, Optional from urllib.parse import quote import aiohttp import gradio as gr import ssl import certifi # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class CVEDataFetcher: def __init__(self): self.session: Optional[aiohttp.ClientSession] = None async def __aenter__(self): ssl_context = ssl.create_default_context(cafile=certifi.where()) connector = aiohttp.TCPConnector(ssl=ssl_context) self.session = aiohttp.ClientSession(connector=connector) return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self.session: await self.session.close() async def get_cve_details(self, cve_id: str) -> Dict[str, Any]: """Fetch detailed CVE information from NVD API.""" if not cve_id or not cve_id.upper().startswith('CVE-'): raise ValueError("Invalid CVE ID format. Expected format: CVE-YYYY-NNNN") cve_id = cve_id.upper() url = f"https://services.nvd.nist.gov/rest/json/cves/2.0?cveId={cve_id}" async with self.session.get(url) as response: if response.status != 200: raise Exception(f"Failed to fetch CVE data: HTTP {response.status}") data = await response.json() if not data.get('vulnerabilities'): raise Exception(f"CVE {cve_id} not found") vuln = data['vulnerabilities'][0]['cve'] # Extract and format the vulnerability details details = { 'id': vuln['id'], 'sourceIdentifier': vuln.get('sourceIdentifier', 'N/A'), 'published': vuln.get('published', 'N/A'), 'lastModified': vuln.get('lastModified', 'N/A'), 'vulnStatus': vuln.get('vulnStatus', 'N/A'), 'descriptions': [], 'cvss_scores': {}, 'weaknesses': [], 'configurations': [], 'references': [], 'vendor_comments': [] } # Descriptions for desc in vuln.get('descriptions', []): if desc['lang'] == 'en': details['descriptions'].append(desc['value']) # CVSS Scores metrics = vuln.get('metrics', {}) if 'cvssMetricV31' in metrics: cvss31 = metrics['cvssMetricV31'][0]['cvssData'] details['cvss_scores']['v3.1'] = { 'baseScore': cvss31.get('baseScore'), 'baseSeverity': cvss31.get('baseSeverity'), 'vectorString': cvss31.get('vectorString'), 'attackVector': cvss31.get('attackVector'), 'attackComplexity': cvss31.get('attackComplexity'), 'privilegesRequired': cvss31.get('privilegesRequired'), 'userInteraction': cvss31.get('userInteraction'), 'scope': cvss31.get('scope'), 'confidentialityImpact': cvss31.get('confidentialityImpact'), 'integrityImpact': cvss31.get('integrityImpact'), 'availabilityImpact': cvss31.get('availabilityImpact') } if 'cvssMetricV2' in metrics: cvss2 = metrics['cvssMetricV2'][0]['cvssData'] details['cvss_scores']['v2.0'] = { 'baseScore': cvss2.get('baseScore'), 'vectorString': cvss2.get('vectorString'), 'accessVector': cvss2.get('accessVector'), 'accessComplexity': cvss2.get('accessComplexity'), 'authentication': cvss2.get('authentication'), 'confidentialityImpact': cvss2.get('confidentialityImpact'), 'integrityImpact': cvss2.get('integrityImpact'), 'availabilityImpact': cvss2.get('availabilityImpact') } # Weaknesses (CWE) for weakness in vuln.get('weaknesses', []): for desc in weakness.get('description', []): if desc['lang'] == 'en': details['weaknesses'].append({ 'type': weakness.get('type'), 'cwe_id': desc.get('value'), 'description': desc.get('value') }) # Configurations (affected systems) for config in vuln.get('configurations', []): for node in config.get('nodes', []): for cpe_match in node.get('cpeMatch', []): details['configurations'].append({ 'criteria': cpe_match.get('criteria'), 'vulnerable': cpe_match.get('vulnerable'), 'versionStartIncluding': cpe_match.get('versionStartIncluding'), 'versionEndExcluding': cpe_match.get('versionEndExcluding') }) # References for ref in vuln.get('references', []): details['references'].append({ 'url': ref.get('url'), 'source': ref.get('source'), 'tags': ref.get('tags', []) }) # Vendor comments for comment in vuln.get('vendorComments', []): details['vendor_comments'].append({ 'organization': comment.get('organization'), 'comment': comment.get('comment'), 'lastModified': comment.get('lastModified') }) return details async def search_cves(self, keyword: str, limit: int = 10) -> Dict[str, Any]: """Search for CVEs by keyword.""" if not keyword: raise ValueError("Keyword is required for search") # URL encode the keyword encoded_keyword = quote(keyword) url = f"https://services.nvd.nist.gov/rest/json/cves/2.0?keywordSearch={encoded_keyword}&resultsPerPage={limit}" async with self.session.get(url) as response: if response.status != 200: raise Exception(f"Failed to search CVEs: HTTP {response.status}") data = await response.json() results = { 'total_results': data.get('totalResults', 0), 'results_per_page': data.get('resultsPerPage', 0), 'start_index': data.get('startIndex', 0), 'cves': [] } for vuln_data in data.get('vulnerabilities', []): vuln = vuln_data['cve'] cve_summary = { 'id': vuln['id'], 'published': vuln.get('published', 'N/A'), 'lastModified': vuln.get('lastModified', 'N/A'), 'vulnStatus': vuln.get('vulnStatus', 'N/A'), 'description': '', 'cvss_score': None, 'severity': None } # Get first English description for desc in vuln.get('descriptions', []): if desc['lang'] == 'en': cve_summary['description'] = desc['value'][:200] + '...' if len(desc['value']) > 200 else desc['value'] break # Get CVSS score metrics = vuln.get('metrics', {}) if 'cvssMetricV31' in metrics: cvss_data = metrics['cvssMetricV31'][0]['cvssData'] cve_summary['cvss_score'] = cvss_data.get('baseScore') cve_summary['severity'] = cvss_data.get('baseSeverity') elif 'cvssMetricV2' in metrics: cvss_data = metrics['cvssMetricV2'][0]['cvssData'] cve_summary['cvss_score'] = cvss_data.get('baseScore') results['cves'].append(cve_summary) return results # Global server instance cve_fetcher = CVEDataFetcher() def get_cve_details_sync(cve_id: str) -> str: """Synchronous wrapper for getting CVE details.""" return asyncio.run(get_cve_details_gradio(cve_id)) def search_cves_sync(keyword: str, limit: int) -> str: """Synchronous wrapper for searching CVEs.""" return asyncio.run(search_cves_gradio(keyword, limit)) async def get_cve_details_gradio(cve_id: str) -> str: """Gradio interface function for getting CVE details.""" if not cve_id.strip(): return "Please enter a CVE ID (e.g., CVE-2023-1234)" try: async with CVEDataFetcher() as fetcher: details = await fetcher.get_cve_details(cve_id.strip()) # Format the output for better readability output = f"# {details['id']} Details\n\n" output += f"**Status:** {details['vulnStatus']}\n" output += f"**Published:** {details['published']}\n" output += f"**Last Modified:** {details['lastModified']}\n" output += f"**Source:** {details['sourceIdentifier']}\n\n" # Description if details['descriptions']: output += "## Description\n" for desc in details['descriptions']: output += f"{desc}\n\n" # CVSS Scores if details['cvss_scores']: output += "## CVSS Scores\n" for version, scores in details['cvss_scores'].items(): output += f"### CVSS {version}\n" if 'baseScore' in scores: output += f"**Base Score:** {scores['baseScore']}\n" if 'baseSeverity' in scores: output += f"**Severity:** {scores['baseSeverity']}\n" if 'vectorString' in scores: output += f"**Vector:** {scores['vectorString']}\n" output += "\n" # Weaknesses if details['weaknesses']: output += "## Weaknesses (CWE)\n" for weakness in details['weaknesses']: output += f"- **{weakness['cwe_id']}:** {weakness['description']}\n" output += "\n" # Affected Configurations if details['configurations']: output += "## Affected Configurations\n" for config in details['configurations'][:10]: # Limit to first 10 output += f"- {config['criteria']}\n" if config.get('versionStartIncluding'): output += f" - From version: {config['versionStartIncluding']}\n" if config.get('versionEndExcluding'): output += f" - Before version: {config['versionEndExcluding']}\n" output += "\n" # References if details['references']: output += "## References\n" for ref in details['references'][:10]: # Limit to first 10 output += f"- [{ref['source']}]({ref['url']})\n" if ref['tags']: output += f" - Tags: {', '.join(ref['tags'])}\n" output += "\n" return output except Exception as e: return f"Error: {str(e)}" async def search_cves_gradio(keyword: str, limit: int) -> str: """Gradio interface function for searching CVEs.""" if not keyword.strip(): return "Please enter a search keyword" try: async with CVEDataFetcher() as fetcher: results = await fetcher.search_cves(keyword.strip(), limit) output = f"# CVE Search Results for '{keyword}'\n\n" output += f"**Total Results:** {results['total_results']}\n" output += f"**Showing:** {len(results['cves'])} results\n\n" for cve in results['cves']: output += f"## {cve['id']}\n" output += f"**Published:** {cve['published']}\n" output += f"**Status:** {cve['vulnStatus']}\n" if cve['cvss_score']: output += f"**CVSS Score:** {cve['cvss_score']}" if cve['severity']: output += f" ({cve['severity']})" output += "\n" output += f"**Description:** {cve['description']}\n\n" output += "---\n\n" return output except Exception as e: return f"Error: {str(e)}" def create_gradio_interface(): """Create the Gradio web interface.""" with gr.Blocks(title="CVE Details MCP Server", theme=gr.themes.Soft()) as app: gr.Markdown("# 🔒 CVE Details MCP Server") gr.Markdown("Get comprehensive vulnerability details from the National Vulnerability Database") with gr.Tabs(): # CVE Details Tab with gr.Tab("CVE Details"): gr.Markdown("Enter a CVE ID to get detailed vulnerability information") with gr.Row(): cve_input = gr.Textbox( label="CVE ID", placeholder="e.g., CVE-2023-1234", value="CVE-2023-44487" ) get_details_btn = gr.Button("Get Details", variant="primary") cve_output = gr.Markdown(label="CVE Details") get_details_btn.click( fn=get_cve_details_sync, inputs=[cve_input], outputs=[cve_output] ) # CVE Search Tab with gr.Tab("CVE Search"): gr.Markdown("Search for CVEs by keyword, vendor, or product name") with gr.Row(): search_input = gr.Textbox( label="Search Keyword", placeholder="e.g., apache, wordpress, buffer overflow" ) limit_input = gr.Slider( label="Max Results", minimum=1, maximum=50, value=10, step=1 ) search_btn = gr.Button("Search CVEs", variant="primary") search_output = gr.Markdown(label="Search Results") search_btn.click( fn=search_cves_sync, inputs=[search_input, limit_input], outputs=[search_output] ) # MCP Info Tab with gr.Tab("MCP Server Info"): gr.Markdown(""" ## Model Context Protocol (MCP) Server This server implements the MCP specification and provides two main tools: ### Available Tools: 1. **get_cve_details**: Get comprehensive details for a specific CVE 2. **search_cves**: Search for CVEs by keyword ### Usage as MCP Server: This interface automatically runs as an MCP server when launched. ### Features: - Real-time CVE data from NIST NVD - CVSS v2.0 and v3.1 scoring - CWE weakness classification - Affected product configurations - Reference links and vendor comments - Comprehensive search functionality ### Data Source: [NIST National Vulnerability Database (NVD)](https://nvd.nist.gov/) """) return app # Create the main CVE details interface cve_details_demo = gr.Interface( fn=get_cve_details_sync, inputs=gr.Textbox(label="CVE ID", placeholder="e.g., CVE-2023-1234"), outputs=gr.Markdown(label="CVE Details"), title="CVE Details Lookup", description="Enter a CVE ID to get comprehensive vulnerability information from the National Vulnerability Database." ) # Create the CVE search interface cve_search_demo = gr.Interface( fn=search_cves_sync, inputs=[ gr.Textbox(label="Search Keyword", placeholder="e.g., apache, wordpress, buffer overflow"), gr.Slider(label="Max Results", minimum=1, maximum=50, value=10, step=1) ], outputs=gr.Markdown(label="Search Results"), title="CVE Search", description="Search for CVEs by keyword, vendor, or product name." ) # Create tabbed interface combining both functions demo = gr.TabbedInterface( [cve_details_demo, cve_search_demo], ["CVE Details", "CVE Search"], title="🔒 CVE Details MCP Server" ) demo.launch( server_name="0.0.0.0", server_port=7860, share=False, debug=False, mcp_server=True )