Spaces:
Sleeping
Sleeping
| import requests | |
| import json | |
| class ThreatIntelligence: | |
| def __init__(self): | |
| self.api_keys = {} | |
| self.endpoints = {} | |
| self.threat_data = [] | |
| def add_api_key(self, provider, api_key): | |
| self.api_keys[provider] = api_key | |
| def set_endpoint(self, provider, endpoint): | |
| self.endpoints[provider] = endpoint | |
| def fetch_threat_data(self, provider): | |
| if provider in self.api_keys and provider in self.endpoints: | |
| headers = {"Authorization": f"Bearer {self.api_keys[provider]}"} | |
| response = requests.get(self.endpoints[provider], headers=headers) | |
| if response.status_code == 200: | |
| self.threat_data.append(response.json()) | |
| else: | |
| print(f"Failed to fetch data from {provider}: {response.status_code}") | |
| else: | |
| print(f"API key or endpoint for {provider} not set.") | |
| def aggregate_threat_data(self): | |
| aggregated_data = {} | |
| for data in self.threat_data: | |
| for item in data: | |
| threat_id = item.get("id") | |
| if threat_id not in aggregated_data: | |
| aggregated_data[threat_id] = item | |
| else: | |
| aggregated_data[threat_id].update(item) | |
| return aggregated_data | |
| def normalize_threat_data(self, data): | |
| normalized_data = [] | |
| for item in data.values(): | |
| normalized_item = { | |
| "id": item.get("id"), | |
| "type": item.get("type"), | |
| "description": item.get("description"), | |
| "severity": item.get("severity"), | |
| "source": item.get("source"), | |
| "timestamp": item.get("timestamp") | |
| } | |
| normalized_data.append(normalized_item) | |
| return normalized_data | |
| def integrate_with_misp(self, misp_url, misp_key): | |
| headers = {"Authorization": f"Bearer {misp_key}"} | |
| response = requests.get(misp_url, headers=headers) | |
| if response.status_code == 200: | |
| self.threat_data.append(response.json()) | |
| else: | |
| print(f"Failed to integrate with MISP: {response.status_code}") | |
| def integrate_with_opencti(self, opencti_url, opencti_key): | |
| headers = {"Authorization": f"Bearer {opencti_key}"} | |
| response = requests.get(opencti_url, headers=headers) | |
| if response.status_code == 200: | |
| self.threat_data.append(response.json()) | |
| else: | |
| print(f"Failed to integrate with OpenCTI: {response.status_code}") | |
| def get_threat_data(self): | |
| return self.threat_data | |
| def clear_threat_data(self): | |
| self.threat_data = [] | |