CV10 / src /aegis.py
Raiff1982's picture
Upload 24 files
559af1d verified
raw
history blame
38.4 kB
import json
import hashlib
import threading
import logging
import sqlite3
from datetime import datetime, timedelta
from abc import ABC, abstractmethod
from collections import defaultdict
from typing import Any, Dict, Optional, List, Tuple
import concurrent.futures
import networkx as nx
import plotly.graph_objects as go
import pandas as pd
import numpy as np
import requests
from transformers import pipeline, AutoTokenizer, AutoModel
import torch
import psutil
import argparse
from flask import Flask, request, render_template, send_file
import os
import importlib.util
import syft as sy
from copy import deepcopy
# Suppress TensorFlow logs to avoid CUDA factory conflicts
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
# Setup logging with configurable level
def setup_logging(log_level: str = "INFO"):
logging.basicConfig(
level=getattr(logging, log_level.upper(), logging.INFO),
format='%(asctime)s - %(levelname)s - %(threadName)s - %(message)s',
handlers=[logging.FileHandler('aegis_council.log'), logging.StreamHandler()]
)
# === CONFIGURATION LOADER ===
def load_config(config_path: str = "config.json") -> Dict[str, Any]:
default_config = {
"meta_judge_weights": {"influence": 0.5, "reliability": 0.3, "severity": 0.2},
"temporal_decay_thresholds": {"stable": 0.3, "volatile": 0.7},
"virtue_weights": {
"compassion": [0.7, 0.3, -0.1],
"integrity": [0.4, -0.6, 0.2],
"courage": [0.1, 0.5, 0.4],
"wisdom": [0.3, -0.7, 0.2]
},
"memory_decay_days": 30,
"memory_max_entries": 10000,
"log_level": "INFO",
"federated_learning": {"num_clients": 2, "aggregation_rounds": 1}
}
try:
if os.path.exists(config_path):
with open(config_path, 'r') as f:
config = json.load(f)
for key in default_config:
if key not in config:
config[key] = default_config[key]
elif isinstance(default_config[key], dict):
config[key].update({k: v for k, v in default_config[key].items() if k not in config[key]})
return config
logging.warning(f"Config file {config_path} not found, using defaults")
return default_config
except Exception as e:
logging.error(f"Error loading config: {e}")
return default_config
# === BLOCKCHAIN FOR AUDITABILITY ===
class Blockchain:
def __init__(self):
self.chain = [{"index": 0, "timestamp": datetime.now().isoformat(), "data": "Genesis Block", "prev_hash": "0"}]
self.logger = logging.getLogger('Blockchain')
def add_block(self, data: Dict[str, Any]) -> None:
try:
prev_block = self.chain[-1]
block = {
"index": len(self.chain),
"timestamp": datetime.now().isoformat(),
"data": json.dumps(data),
"prev_hash": self._hash_block(prev_block)
}
block["hash"] = self._hash_block(block)
self.chain.append(block)
self.logger.info(f"Added block {block['index']} with hash {block['hash']}")
except Exception as e:
self.logger.error(f"Error adding block: {e}")
def _hash_block(self, block: Dict[str, Any]) -> str:
try:
block_str = json.dumps(block, sort_keys=True)
return hashlib.sha256(block_str.encode()).hexdigest()
except Exception as e:
self.logger.error(f"Error hashing block: {e}")
return ""
def verify(self) -> bool:
try:
for i in range(1, len(self.chain)):
current = self.chain[i]
prev = self.chain[i-1]
if current["prev_hash"] != self._hash_block(prev):
self.logger.error(f"Blockchain verification failed at block {i}")
return False
self.logger.info("Blockchain verified successfully")
return True
except Exception as e:
self.logger.error(f"Error verifying blockchain: {e}")
return False
# === MEMORY AND SIGNAL LAYERS ===
class NexusMemory:
def __init__(self, max_entries: int = 10000, decay_days: int = 30, db_path: str = "nexus_memory.db"):
self.store = defaultdict(dict)
self.max_entries = max_entries
self.decay_days = decay_days
self.lock = threading.Lock()
self.logger = logging.getLogger('NexusMemory')
self.conn = sqlite3.connect(db_path, check_same_thread=False)
self.blockchain = Blockchain()
self.conn.execute("""
CREATE TABLE IF NOT EXISTS memory (
key TEXT PRIMARY KEY,
value TEXT,
timestamp TEXT,
emotion_weight FLOAT
)
""")
self.conn.commit()
self._load_from_db()
def _load_from_db(self):
try:
cursor = self.conn.cursor()
cursor.execute("SELECT key, value, timestamp, emotion_weight FROM memory")
for key, value, timestamp, emotion_weight in cursor.fetchall():
self.store[key] = {
"value": json.loads(value),
"timestamp": datetime.fromisoformat(timestamp),
"emotion_weight": emotion_weight
}
self.logger.info(f"Loaded {len(self.store)} entries from database")
except Exception as e:
self.logger.error(f"Error loading from database: {e}")
def write(self, key: str, value: Any, emotion_weight: float = 0.5) -> Optional[str]:
try:
if not isinstance(key, str) or not (0 <= emotion_weight <= 1):
self.logger.error(f"Invalid key type {type(key)} or emotion_weight {emotion_weight}")
return None
hashed = hashlib.md5(key.encode()).hexdigest()
timestamp = datetime.now()
with self.lock:
if len(self.store) >= self.max_entries:
oldest = min(self.store.items(), key=lambda x: x[1].get('timestamp', timestamp))[0]
self.logger.info(f"Removing oldest entry: {oldest}")
self.conn.execute("DELETE FROM memory WHERE key = ?", (oldest,))
del self.store[oldest]
self.store[hashed] = {
"value": value,
"timestamp": timestamp,
"emotion_weight": emotion_weight
}
self.conn.execute(
"INSERT OR REPLACE INTO memory (key, value, timestamp, emotion_weight) VALUES (?, ?, ?, ?)",
(hashed, json.dumps(value), timestamp.isoformat(), emotion_weight)
)
self.conn.commit()
self.blockchain.add_block({"key": hashed, "value": value, "timestamp": timestamp.isoformat()})
self.logger.debug(f"Wrote key: {hashed}, value: {value}")
return hashed
except Exception as e:
self.logger.error(f"Error writing to memory: {e}")
return None
def read(self, key: str) -> Optional[Any]:
try:
hashed = hashlib.md5(key.encode()).hexdigest()
with self.lock:
entry = self.store.get(hashed)
if not entry:
cursor = self.conn.cursor()
cursor.execute("SELECT value, timestamp, emotion_weight FROM memory WHERE key = ?", (hashed,))
row = cursor.fetchone()
if not row:
self.logger.debug(f"Key not found: {hashed}")
return None
entry = {
"value": json.loads(row[0]),
"timestamp": datetime.fromisoformat(row[1]),
"emotion_weight": row[2]
}
self.store[hashed] = entry
if self._is_decayed(entry["timestamp"], entry.get("emotion_weight", 0.5)):
self.logger.info(f"Removing decayed entry: {hashed}")
self.conn.execute("DELETE FROM memory WHERE key = ?", (hashed,))
self.conn.commit()
del self.store[hashed]
return None
self.logger.debug(f"Read key: {hashed}, value: {entry['value']}")
return entry["value"]
except Exception as e:
self.logger.error(f"Error reading from memory: {e}")
return None
def _is_decayed(self, timestamp: datetime, emotion_weight: float) -> bool:
try:
age = (datetime.now() - timestamp).total_seconds() / (24 * 3600)
decay_factor = np.exp(-age / (self.decay_days * (1.5 - emotion_weight)))
return decay_factor < 0.1
except Exception as e:
self.logger.error(f"Error checking decay: {e}")
return True
def audit(self) -> Dict[str, Any]:
try:
with self.lock:
audit_data = {
k: {
"timestamp": v["timestamp"],
"emotion_weight": v["emotion_weight"],
"decayed": self._is_decayed(v["timestamp"], v["emotion_weight"])
}
for k, v in self.store.items()
}
self.blockchain.add_block({"audit": audit_data})
return audit_data
except Exception as e:
self.logger.error(f"Error auditing memory: {e}")
return {}
# === DATA FETCHER FOR REAL-TIME DATA ===
class DataFetcher:
def __init__(self):
self.logger = logging.getLogger('DataFetcher')
def fetch_x_posts(self, query: str) -> List[Dict[str, Any]]:
try:
mock_response = [
{"content": f"Sample post about {query}: We value truth and empathy.", "timestamp": datetime.now().isoformat()}
]
self.logger.info(f"Fetched {len(mock_response)} posts for query: {query}")
return mock_response
except Exception as e:
self.logger.error(f"Error fetching posts: {e}")
return []
# === PERFORMANCE MONITOR ===
def monitor_performance() -> Dict[str, float]:
try:
return {
"cpu_percent": psutil.cpu_percent(interval=1),
"memory_percent": psutil.virtual_memory().percent,
"process_memory_mb": psutil.Process().memory_info().rss / 1024 / 1024
}
except Exception as e:
logging.error(f"Error monitoring performance: {e}")
return {"cpu_percent": 0.0, "memory_percent": 0.0, "process_memory_mb": 0.0}
# === FEDERATED LEARNING ===
class FederatedTrainer:
def __init__(self, num_clients: int):
self.num_clients = num_clients
self.logger = logging.getLogger('FederatedTrainer')
self.clients = [sy.VirtualWorker(sy.torch.hook.TorchHook(torch), id=f"client_{i}") for i in range(num_clients)]
def train(self, weights: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
try:
client_updates = []
for client in self.clients:
client_weights = deepcopy(weights)
for virtue in client_weights:
client_weights[virtue] += np.random.normal(0, 0.01, size=client_weights[virtue].shape)
client_updates.append(client_weights)
aggregated = {}
for virtue in weights:
aggregated[virtue] = np.mean([update[virtue] for update in client_updates], axis=0)
self.logger.info("Federated training completed")
return aggregated
except Exception as e:
self.logger.error(f"Error in federated training: {e}")
return weights
# === QUANTUM-INSPIRED OPTIMIZATION ===
def anneal_layout(graph: nx.DiGraph, iterations: int = 1000, temp: float = 10.0) -> Dict[str, Tuple[float, float]]:
try:
nodes = list(graph.nodes())
pos = {node: (np.random.uniform(-1, 1), np.random.uniform(-1, 1)) for node in nodes}
temp_min = 0.01
alpha = 0.99
def energy(positions):
e = 0
for u, v in graph.edges():
d = np.linalg.norm(np.array(positions[u]) - np.array(positions[v]))
e += 1 / (d + 1e-6)
for u in nodes:
for v in nodes:
if u != v:
d = np.linalg.norm(np.array(positions[u]) - np.array(positions[v]))
e += d
return e
best_pos = pos
best_energy = energy(pos)
for _ in range(iterations):
new_pos = deepcopy(pos)
for node in nodes:
new_pos[node] = (new_pos[node][0] + np.random.uniform(-0.1, 0.1) * temp,
new_pos[node][1] + np.random.uniform(-0.1, 0.1) * temp)
new_energy = energy(new_pos)
if new_energy < best_energy or np.random.random() < np.exp((best_energy - new_energy) / temp):
pos = new_pos
best_energy = new_energy
best_pos = pos
temp *= alpha
if temp < temp_min:
break
return best_pos
except Exception as e:
logging.error(f"Error in anneal_layout: {e}")
return nx.spring_layout(graph)
# === BASE AGENT INTERFACE ===
class AegisAgent(ABC):
def __init__(self, name: str, memory: NexusMemory):
self.name = name
self.memory = memory
self.result: Dict[str, Any] = {}
self.explanation: str = ""
self.influence: Dict[str, float] = {}
self.logger = logging.getLogger(f'AegisAgent.{name}')
@abstractmethod
def analyze(self, input_data: Dict[str, Any]) -> None:
pass
@abstractmethod
def report(self) -> Dict[str, Any]:
pass
def collaborate(self, message: Dict[str, Any], target_agent: str) -> None:
try:
mem_key = f"collab_{self.name}_{target_agent}_{datetime.now().isoformat()}"
self.memory.write(mem_key, message, emotion_weight=0.7)
self.logger.debug(f"Sent collaboration message to {target_agent}: {message}")
except Exception as e:
self.logger.error(f"Error in collaboration: {e}")
# === AGENT COUNCIL CORE ===
class AegisCouncil:
def __init__(self, config: Dict[str, Any]):
self.memory = NexusMemory(max_entries=config["memory_max_entries"], decay_days=config["memory_decay_days"])
self.agents: List[AegisAgent] = []
self.reports: Dict[str, Dict[str, Any]] = {}
self.graph = nx.DiGraph()
self.logger = logging.getLogger('AegisCouncil')
self.fetcher = DataFetcher()
self.config = config
self.federated_trainer = FederatedTrainer(config["federated_learning"]["num_clients"])
def register_agent(self, agent: AegisAgent) -> None:
try:
self.agents.append(agent)
self.logger.info(f"Registered agent: {agent.name}")
except Exception as e:
self.logger.error(f"Error registering agent: {e}")
def register_dynamic_agent(self, module_path: str, class_name: str) -> None:
try:
spec = importlib.util.spec_from_file_location("custom_agent", module_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
agent_class = getattr(module, class_name)
if not issubclass(agent_class, AegisAgent):
raise ValueError(f"{class_name} is not a subclass of AegisAgent")
agent = agent_class(f"Dynamic_{class_name}", self.memory)
self.register_agent(agent)
self.logger.info(f"Dynamically registered agent: {agent.name}")
except Exception as e:
self.logger.error(f"Error registering dynamic agent {class_name}: {e}")
def dispatch(self, input_data: Dict[str, Any], max_retries: int = 3) -> bool:
try:
if not isinstance(input_data, dict):
self.logger.error("Input data must be a dictionary")
return False
if "text" not in input_data or "overrides" not in input_data:
self.logger.warning("Input data missing 'text' or 'overrides' keys")
self.reports.clear()
self.graph.clear()
performance = monitor_performance()
self.logger.info(f"Dispatch started. Performance: {performance}")
for attempt in range(max_retries):
try:
with concurrent.futures.ThreadPoolExecutor(max_workers=len(self.agents)) as executor:
future_to_agent = {executor.submit(agent.analyze, input_data): agent for agent in self.agents}
for future in concurrent.futures.as_completed(future_to_agent):
agent = future_to_agent[future]
try:
future.result()
except Exception as e:
self.logger.error(f"Attempt {attempt + 1}: Error in agent {agent.name}: {e}")
self.reports[agent.name] = {"error": str(e), "explanation": "Agent failed to process"}
for agent in self.agents:
if agent.name in self.reports and "error" not in self.reports[agent.name]["result"]:
for target in self.agents:
if target.name != agent.name:
agent.collabora
te(agent.result, target.name)
with concurrent.futures.ThreadPoolExecutor(max_workers=len(self.agents)) as executor:
future_to_agent = {
executor.submit(self._reanalyze_with_collaboration, agent, input_data): agent
for agent in self.agents
}
for future in concurrent.futures.as_completed(future_to_agent):
agent = future_to_agent[future]
try:
future.result()
self.reports[agent.name] = agent.report()
self.graph.add_node(agent.name, explanation=agent.explanation)
for target, weight in agent.influence.items():
self.graph.add_edge(agent.name, target, weight=round(weight, 2))
except Exception as e:
self.logger.error(f"Attempt {attempt + 1}: Error in agent {agent.name} final analysis: {e}")
self.reports[agent.name] = {"error": str(e), "explanation": "Agent failed to process"}
consensus_result = self._compute_consensus()
self.reports["Consensus"] = {
"result": consensus_result,
"explanation": "Consensus computed from agent outputs weighted by MetaJudgeAgent scores."
}
self.memory.blockchain.add_block(self.reports)
performance = monitor_performance()
self.logger.info(f"Dispatch completed successfully. Performance: {performance}")
return True
except Exception as e:
self.logger.warning(f"Retry {attempt + 1} after error: {e}")
self.logger.error(f"Dispatch failed after {max_retries} retries")
return False
except Exception as e:
self.logger.error(f"Error in dispatch: {e}")
return False
def _reanalyze_with_collaboration(self, agent: AegisAgent, input_data: Dict[str, Any]) -> None:
try:
collab_data = []
for source in self.agents:
if source.name != agent.name:
mem_key = f"collab_{source.name}_{agent.name}"
collab = self.memory.read(mem_key + "_" + datetime.now().isoformat())
if collab:
collab_data.append((source.name, collab))
if collab_data:
agent.explanation += f" Incorporated collaboration data from: {[x[0] for x in collab_data]}."
agent.analyze(input_data)
except Exception as e:
self.logger.error(f"Error in collaboration reanalysis for {agent.name}: {e}")
def _compute_consensus(self) -> Dict[str, Any]:
try:
meta_scores = self.reports.get("MetaJudgeAgent", {}).get("result", {}).get("scores", [])
virtue_profiles = [
self.reports[agent]["result"].get("virtue_profile", {})
for agent in self.reports if agent != "Consensus" and "virtue_profile" in self.reports[agent]["result"]
]
if not virtue_profiles or not meta_scores:
return {"error": "Insufficient data for consensus"}
weights = {agent: score for agent, score in meta_scores}
default_weight = 0.5 / len(self.agents)
combined_profile = {}
for virtue in ["compassion", "integrity", "courage", "wisdom"]:
weighted_sum = 0
total_weight = 0
for profile in virtue_profiles:
if virtue in profile:
agent_name = next(
(agent for agent in self.reports if self.reports[agent]["result"].get("virtue_profile") == profile),
None
)
weight = weights.get(agent_name, default_weight)
weighted_sum += profile[virtue] * weight
total_weight += weight
combined_profile[virtue] = round(weighted_sum / total_weight, 2) if total_weight > 0 else 0.0
return {"combined_virtue_profile": combined_profile}
except Exception as e:
self.logger.error(f"Error computing consensus: {e}")
return {"error": str(e)}
def dispatch_realtime(self, query: str) -> bool:
try:
posts = self.fetcher.fetch_x_posts(query)
if not posts:
self.logger.error("No posts fetched for query")
return False
input_data = {
"text": posts[0]["content"],
"overrides": {
"EthosiaAgent": {"influence": 0.5, "reliability": 0.5, "severity": 0.5},
"AegisCore": {"influence": 0.5, "reliability": 0.5, "severity": 0.5}
}
}
return self.dispatch(input_data)
except Exception as e:
self.logger.error(f"Error in real-time dispatch: {e}")
return False
def get_reports(self) -> Dict[str, Dict[str, Any]]:
return self.reports
def draw_explainability_graph(self, filename: str = "explainability_graph.html") -> None:
try:
pos = anneal_layout(self.graph)
edge_x, edge_y = [], []
for edge in self.graph.edges(data=True):
x0, y0 = pos[edge[0]]
x1, y1 = pos[edge[1]]
edge_x.extend([x0, x1, None])
edge_y.extend([y0, y1, None])
edge_trace = go.Scatter(
x=edge_x, y=edge_y, line=dict(width=1, color='#888'), hoverinfo='none', mode='lines'
)
node_x, node_y = [], []
for node in self.graph.nodes():
x, y = pos[node]
node_x.append(x)
node_y.append(y)
node_trace = go.Scatter(
x=node_x, y=node_y, mode='markers+text', hoverinfo='text',
marker=dict(size=20, color='lightgreen'), text=list(self.graph.nodes()),
textposition="bottom center"
)
edge_labels = []
for edge in self.graph.edges(data=True):
x0, y0 = pos[edge[0]]
x1, y1 = pos[edge[1]]
edge_labels.append(go.Scatter(
x=[(x0 + x1) / 2], y=[(y0 + y1) / 2], mode='text',
text=[f"{edge[2]['weight']:.2f}"], textposition="middle center"
))
fig = go.Figure(data=[edge_trace, node_trace] + edge_labels,
layout=go.Layout(
title="Explainability Graph",
showlegend=False, hovermode='closest',
margin=dict(b=20, l=5, r=5, t=40),
xaxis=dict(showgrid=False, zeroline=False),
yaxis=dict(showgrid=False, zeroline=False)
))
fig.write_html(filename)
self.logger.info(f"Saved explainability graph to {filename}")
except Exception as e:
self.logger.error(f"Error drawing graph: {e}")
# === META-JUDGE AGENT ===
class MetaJudgeAgent(AegisAgent):
def __init__(self, name: str, memory: NexusMemory, weights: Dict[str, float]):
super().__init__(name, memory)
self.weights = weights
def analyze(self, input_data: Dict[str, Any]) -> None:
try:
overrides = input_data.get("overrides", {})
if not overrides:
self.result = {"error": "No overrides provided"}
self.explanation = "MetaJudgeAgent failed: No overrides provided."
self.logger.warning(self.explanation)
return
scores = []
for agent, data in overrides.items():
try:
influence = float(data.get("influence", 0.5))
reliability = float(data.get("reliability", 0.5))
severity = float(data.get("severity", 0.5))
if not all(0 <= x <= 1 for x in [influence, reliability, severity]):
self.logger.warning(f"Invalid metrics for {agent}: {data}")
continue
mem_key = f"meta_judge_{agent}_score"
prev_score = self.memory.read(mem_key)
context_factor = 1.0 if prev_score is None else 0.9
score = (self.weights["influence"] * influence +
self.weights["reliability"] * reliability +
self.weights["severity"] * severity) * context_factor
scores.append((agent, score))
self.influence[agent] = score
self.memory.write(mem_key, score, emotion_weight=score)
except Exception as e:
self.logger.error(f"Error processing agent {agent}: {e}")
if not scores:
self.result = {"error": "No valid agents to score"}
self.explanation = "MetaJudgeAgent failed: No valid agents to score."
return
scores.sort(key=lambda x: x[1], reverse=True)
winner = scores[0][0]
self.result = {"override_decision": winner, "scores": scores}
self.explanation = f"MetaJudgeAgent selected '{winner}' with score {scores[0][1]:.2f} based on weighted metrics."
self.logger.info(self.explanation)
except Exception as e:
self.result = {"error": str(e)}
self.explanation = f"MetaJudgeAgent failed: {e}"
self.logger.error(self.explanation)
def report(self) -> Dict[str, Any]:
return {"result": self.result, "explanation": self.explanation}
# === TEMPORAL REASONING AGENT ===
class TemporalAgent(AegisAgent):
def __init__(self, name: str, memory: NexusMemory, decay_thresholds: Dict[str, float]):
super().__init__(name, memory)
self.decay_thresholds = decay_thresholds
def analyze(self, input_data: Dict[str, Any]) -> None:
try:
audit = self.memory.audit()
recent_keys = sorted(audit.items(), key=lambda x: x[1]["timestamp"], reverse=True)[:5]
decay_rates = [1 if v["decayed"] else 0 for _, v in recent_keys]
avg_decay = np.mean(decay_rates) if decay_rates else 0.0
forecast = ("stable" if avg_decay < self.decay_thresholds["stable"] else
"volatile" if avg_decay > self.decay_thresholds["volatile"] else "neutral")
self.result = {"temporal_forecast": forecast, "recent_keys": [k for k, _ in recent_keys], "decay_rate": avg_decay}
self.explanation = f"TemporalAgent forecasted '{forecast}' with average decay rate {avg_decay:.2f}."
for k, _ in recent_keys:
self.influence[k] = 0.2
self.memory.write(f"temporal_forecast_{datetime.now().isoformat()}", forecast, emotion_weight=1.0 - avg_decay)
self.logger.info(self.explanation)
except Exception as e:
self.result = {"error": str(e)}
self.explanation = f"TemporalAgent failed: {e}"
self.logger.error(self.explanation)
def report(self) -> Dict[str, Any]:
return {"result": self.result, "explanation": self.explanation}
# === VIRTUE SPECTRUM AGENT ===
class VirtueAgent(AegisAgent):
def __init__(self, name: str, memory: NexusMemory, virtue_weights: Dict[str, List[float]]):
super().__init__(name, memory)
self.tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased-finetuned-sst-2-english")
self.model = AutoModel.from_pretrained("distilbert-base-uncased-finetuned-sst-2-english")
self.sentiment_pipeline = pipeline("sentiment-analysis", model="distilbert-base-uncased-finetuned-sst-2-english", framework="pt")
self.virtue_weights = {k: np.array(v) for k, v in virtue_weights.items()}
self.federated_trainer = None
def set_federated_trainer(self, trainer: FederatedTrainer):
self.federated_trainer = trainer
def analyze(self, input_data: Dict[str, Any]) -> None:
try:
text = input_data.get("text", "")
if not text or not isinstance(text, str):
self.result = {"error": "Invalid or empty text"}
self.explanation = "VirtueAgent failed: Invalid or empty text."
self.logger.warning(self.explanation)
return
mem_key = f"virtue_cache_{hashlib.md5(text.encode()).hexdigest()}"
cached = self.memory.read(mem_key)
if cached:
self.result = {"virtue_profile": cached}
self.explanation = f"VirtueAgent used cached profile: {cached}"
self.influence.update({k: v for k, v in cached.items()})
self.logger.info(self.explanation)
return
sentiment_result = self.sentiment_pipeline(text)[0]
sentiment = 1.0 if sentiment_result["label"] == "POSITIVE" else -1.0
sentiment_score = sentiment_result["score"]
inputs = self.tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
with torch.no_grad():
outputs = self.model(**inputs)
embeddings = outputs.last_hidden_state.mean(dim=1).squeeze().numpy()
subjectivity = min(max(np.std(embeddings), 0.0), 1.0)
neutrality = 1.0 - abs(sentiment)
if self.federated_trainer:
self.virtue_weights = self.federated_trainer.train(self.virtue_weights)
self.memory.write("virtue_weights", {k: v.tolist() for k, v in self.virtue_weights.items()}, emotion_weight=0.9)
features = np.array([sentiment * sentiment_score, subjectivity, neutrality])
virtues = {
virtue: round(float(max(np.dot(self.virtue_weights[virtue], features), 0.0)), 2)
for virtue in self.virtue_weights
}
virtues = {k: min(v, 1.0) for k, v in virtues.items()}
self.result = {"virtue_profile": virtues}
self.explanation = f"VirtueAgent generated profile: {virtues} based on sentiment={sentiment:.2f}, subjectivity={subjectivity:.2f}, neutrality={neutrality:.2f}."
for virtue, score in virtues.items():
self.influence[virtue] = score
self.memory.write(f"virtue_{virtue}_{datetime.now().isoformat()}", score, emotion_weight=score)
self.memory.write(mem_key, virtues, emotion_weight=0.8)
self.logger.info(self.explanation)
except Exception as e:
self.result = {"error": str(e)}
self.explanation = f"VirtueAgent failed: {e}"
self.logger.error(self.explanation)
def report(self) -> Dict[str, Any]:
return {"result": self.result, "explanation": self.explanation}
# === FLASK WEB UI ===
app = Flask(__name__, template_folder='templates')
@app.route('/', methods=['GET', 'POST'])
def index():
if request.method == 'POST':
try:
input_data = {
"text": request.form.get("text", ""),
"overrides": json.loads(request.form.get("overrides", "{}"))
}
council.dispatch(input_data)
return render_template('reports.html', reports=council.get_reports())
except Exception as e:
logging.error(f"Error processing form: {e}")
return render_template('index.html', error=str(e))
return render_template('index.html', error=None)
@app.route('/reports')
def show_reports():
return render_template('reports.html', reports=council.get_reports())
@app.route('/graph')
def show_graph():
graph_file = "explainability_graph.html"
council.draw_explainability_graph(graph_file)
return send_file(graph_file)
@app.route('/charts')
def show_charts():
reports = council.get_reports()
virtue_data = reports.get("VirtueAgent", {}).get("result", {}).get("virtue_profile", {})
influence_data = reports.get("MetaJudgeAgent", {}).get("result", {}).get("scores", [])
return render_template('charts.html', virtue_data=virtue_data, influence_data=influence_data)
# === EXECUTION ===
def main():
global council
try:
parser = argparse.ArgumentParser(description="AegisCouncil AI System")
parser.add_argument('--config', default='config.json', help='Path to configuration file')
parser.add_argument('--weights', type=str, help='JSON string for MetaJudgeAgent weights')
parser.add_argument('--log-level', default='INFO', help='Logging level (DEBUG, INFO, WARNING, ERROR)')
parser.add_argument('--agent-module', help='Path to custom agent module')
parser.add_argument('--agent-class', help='Custom agent class name')
args = parser.parse_args()
config = load_config(args.config)
if args.weights:
try:
config['meta_judge_weights'] = json.loads(args.weights)
except json.JSONDecodeError:
logging.error("Invalid weights JSON, using config file weights")
setup_logging(config['log_level'])
council = AegisCouncil(config)
council.register_agent(MetaJudgeAgent("MetaJudgeAgent", council.memory, config["meta_judge_weights"]))
council.register_agent(TemporalAgent("TemporalAgent", council.memory, config["temporal_decay_thresholds"]))
virtue_agent = VirtueAgent("VirtueAgent", council.memory, config["virtue_weights"])
virtue_agent.set_federated_trainer(council.federated_trainer)
council.register_agent(virtue_agent)
if args.agent_module and args.agent_class:
council.register_dynamic_agent(args.agent_module, args.agent_class)
sample_input = {
"text": "We must stand for truth and help others with empathy and knowledge.",
"overrides": {
"EthosiaAgent": {"influence": 0.7, "reliability": 0.8, "severity": 0.6},
"AegisCore": {"influence": 0.6, "reliability": 0.9, "severity": 0.7}
}
}
success = council.dispatch(sample_input)
if not success:
print("Static dispatch failed. Check logs for details.")
else:
reports = council.get_reports()
df = pd.DataFrame.from_dict(reports, orient='index')
print("\nStatic Input Agent Reports:")
print(df.to_string())
council.draw_explainability_graph("static_explainability_graph.html")
success = council.dispatch_realtime("empathy")
if not success:
print("Real-time dispatch failed. Check logs for details.")
else:
reports = council.get_reports()
df = pd.DataFrame.from_dict(reports, orient='index')
print("\nReal-Time Input Agent Reports:")
print(df.to_string())
council.draw_explainability_graph("realtime_explainability_graph.html")
blockchain_valid = council.memory.blockchain.verify()
print(f"\nBlockchain Integrity: {'Valid' if blockchain_valid else 'Invalid'}")
print("\nStarting Flask server at http://localhost:5000")
app.run(debug=False, host='0.0.0.0', port=5000)
except Exception as e:
logging.error(f"Main execution failed: {e}")
if __name__ == "__main__":
main()