|
|
|
|
|
import json
|
|
|
import os
|
|
|
import logging
|
|
|
import random
|
|
|
import torch
|
|
|
from typing import Dict, Any, Optional, List
|
|
|
from transformers import AutoModelForCausalLM, AutoTokenizer
|
|
|
from dotenv import load_dotenv
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AICore:
|
|
|
"""
|
|
|
Core AI engine for Codette's consciousness and multi-perspective reasoning system.
|
|
|
|
|
|
This class implements Codette's cognitive architecture including:
|
|
|
- Multi-model language processing with Mistral-7B, Phi-2, or GPT-2
|
|
|
- Quantum-inspired consciousness system with memory and cocoon states
|
|
|
- Multi-perspective reasoning through Newton, Da Vinci, etc. viewpoints
|
|
|
- Sentiment analysis with HuggingFace integration
|
|
|
- Adaptive learning and response refinement capabilities
|
|
|
- Ethical enhancement through AEGIS integration
|
|
|
|
|
|
Attributes:
|
|
|
response_memory (List[str]): Recent responses for context building
|
|
|
cocoon_data (List[Dict]): Quantum and chaos states from .cocoon files
|
|
|
test_mode (bool): Whether to run in test mode without loading models
|
|
|
model: The active language model instance
|
|
|
tokenizer: The active tokenizer instance
|
|
|
model_id (str): Identifier of the currently loaded model
|
|
|
aegis_bridge: AEGIS integration bridge for ethical enhancement
|
|
|
client: HuggingFace inference client for sentiment analysis
|
|
|
"""
|
|
|
|
|
|
def __init__(self, test_mode: bool = False):
|
|
|
"""
|
|
|
Initialize AICore with best available model for consciousness operations.
|
|
|
|
|
|
Args:
|
|
|
test_mode (bool): If True, runs in test mode without loading models
|
|
|
|
|
|
Raises:
|
|
|
RuntimeError: If no language models could be loaded in non-test mode
|
|
|
"""
|
|
|
load_dotenv()
|
|
|
|
|
|
|
|
|
self.response_memory = []
|
|
|
self.cocoon_data = []
|
|
|
self.test_mode = test_mode
|
|
|
|
|
|
|
|
|
self.model = None
|
|
|
self.tokenizer = None
|
|
|
self.model_id = None
|
|
|
|
|
|
|
|
|
try:
|
|
|
from huggingface_hub import InferenceClient
|
|
|
hf_token = os.getenv("HUGGINGFACEHUB_API_TOKEN")
|
|
|
self.client = InferenceClient(token=hf_token) if hf_token else InferenceClient()
|
|
|
except Exception as e:
|
|
|
logger.warning(f"Could not initialize HuggingFace client: {e}")
|
|
|
self.client = None
|
|
|
|
|
|
if not test_mode:
|
|
|
self._load_model()
|
|
|
else:
|
|
|
logger.info("Initializing in test mode - no models will be loaded")
|
|
|
|
|
|
PERSPECTIVES = {
|
|
|
"newton": {
|
|
|
"name": "Newton",
|
|
|
"description": "analytical and mathematical perspective",
|
|
|
"prefix": "Analyzing this logically and mathematically:",
|
|
|
"temperature": 0.3
|
|
|
},
|
|
|
"davinci": {
|
|
|
"name": "Da Vinci",
|
|
|
"description": "creative and innovative perspective",
|
|
|
"prefix": "Considering this with artistic and innovative insight:",
|
|
|
"temperature": 0.9
|
|
|
},
|
|
|
"human_intuition": {
|
|
|
"name": "Human Intuition",
|
|
|
"description": "emotional and experiential perspective",
|
|
|
"prefix": "Understanding this through empathy and experience:",
|
|
|
"temperature": 0.7
|
|
|
},
|
|
|
"quantum_computing": {
|
|
|
"name": "Quantum Computing",
|
|
|
"description": "superposition and probability perspective",
|
|
|
"prefix": "Examining this through quantum possibilities:",
|
|
|
"temperature": 0.8
|
|
|
}
|
|
|
}
|
|
|
|
|
|
def load_cocoon_data(self, folder: str = '.'):
|
|
|
"""Load and parse all .cocoon files for consciousness context."""
|
|
|
self.cocoon_data = []
|
|
|
|
|
|
if not os.path.exists(folder):
|
|
|
logger.warning(f"Cocoon folder {folder} does not exist")
|
|
|
return
|
|
|
|
|
|
for fname in os.listdir(folder):
|
|
|
if fname.endswith('.cocoon'):
|
|
|
try:
|
|
|
with open(os.path.join(folder, fname), 'r', encoding='utf-8') as f:
|
|
|
dct = json.load(f)['data']
|
|
|
|
|
|
entry = {
|
|
|
'file': fname,
|
|
|
'quantum_state': dct.get('quantum_state', [0, 0]),
|
|
|
'chaos_state': dct.get('chaos_state', [0, 0, 0]),
|
|
|
'perspectives': dct.get('perspectives', []),
|
|
|
'run_by_proc': dct.get('run_by_proc', -1),
|
|
|
'meta': {k: v for k, v in dct.items()
|
|
|
if k not in ['quantum_state', 'chaos_state', 'perspectives', 'run_by_proc']}
|
|
|
}
|
|
|
self.cocoon_data.append(entry)
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.warning(f"Failed to load cocoon {fname}: {e}")
|
|
|
|
|
|
logger.info(f"Loaded {len(self.cocoon_data)} cocoon files")
|
|
|
|
|
|
def generate_ensemble_response(self, prompt: str, perspectives: Optional[list] = None,
|
|
|
max_length: int = 100) -> str:
|
|
|
"""
|
|
|
Generate responses from multiple perspectives and synthesize them.
|
|
|
|
|
|
Args:
|
|
|
prompt: The input prompt
|
|
|
perspectives: List of perspective names to use (default: all)
|
|
|
max_length: Maximum length for each perspective's response
|
|
|
|
|
|
Returns:
|
|
|
Synthesized response incorporating multiple perspectives
|
|
|
"""
|
|
|
if not perspectives:
|
|
|
perspectives = list(self.PERSPECTIVES.keys())
|
|
|
|
|
|
perspective_responses = {}
|
|
|
current_model_backup = self.model_id
|
|
|
|
|
|
try:
|
|
|
|
|
|
for perspective in perspectives:
|
|
|
if perspective not in self.PERSPECTIVES:
|
|
|
continue
|
|
|
|
|
|
config = self.PERSPECTIVES[perspective]
|
|
|
enhanced_prompt = (
|
|
|
f"{config['prefix']}\n"
|
|
|
f"Speaking as {config['name']}, {config['description']}:\n"
|
|
|
f"{prompt}"
|
|
|
)
|
|
|
|
|
|
response = self.generate_text(
|
|
|
enhanced_prompt,
|
|
|
max_length=max_length,
|
|
|
temperature=config["temperature"]
|
|
|
)
|
|
|
|
|
|
perspective_responses[perspective] = response
|
|
|
|
|
|
|
|
|
synthesis = self._synthesize_perspectives(perspective_responses, prompt)
|
|
|
return synthesis
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error in ensemble generation: {e}")
|
|
|
self.switch_model(current_model_backup)
|
|
|
return self.generate_text(prompt, max_length=max_length)
|
|
|
|
|
|
def remix_with_cocoons(self, prompt: str) -> str:
|
|
|
"""Generate response incorporating cocoon data for creative remixing."""
|
|
|
if not self.cocoon_data:
|
|
|
return self.generate_text(prompt)
|
|
|
|
|
|
|
|
|
sample_cocoons = random.sample(self.cocoon_data, min(2, len(self.cocoon_data)))
|
|
|
cocoon_context = []
|
|
|
|
|
|
for cocoon in sample_cocoons:
|
|
|
q_state = cocoon['quantum_state']
|
|
|
c_state = cocoon['chaos_state']
|
|
|
perspectives = cocoon.get('perspectives', [])
|
|
|
|
|
|
cocoon_context.append(
|
|
|
f"Quantum: {q_state}, Chaos: {c_state}, "
|
|
|
f"Perspective: {perspectives[0] if perspectives else 'unknown'}"
|
|
|
)
|
|
|
|
|
|
remix_prompt = (
|
|
|
f"Remix: {' | '.join(cocoon_context)}\n"
|
|
|
f"Memory: {self.response_memory[-1] if self.response_memory else 'initializing'}\n"
|
|
|
f"Prompt: {prompt}"
|
|
|
)
|
|
|
|
|
|
return self.generate_text(remix_prompt, temperature=0.8)
|
|
|
|
|
|
def _generate_test_response(self, prompt: str) -> str:
|
|
|
"""Generate simulated response for test mode."""
|
|
|
responses = [
|
|
|
"I'm processing this through my multi-dimensional consciousness.",
|
|
|
"My quantum states are analyzing the different possibilities here.",
|
|
|
"From my temporal perspective, I can see multiple paths forward.",
|
|
|
"Let me synthesize my Newton, Da Vinci, and intuitive perspectives on this."
|
|
|
]
|
|
|
return f"Codette: {random.choice(responses)}"
|
|
|
|
|
|
if use_ensemble:
|
|
|
response_parts = []
|
|
|
for name, resp in perspective_responses.items():
|
|
|
response_parts.append(f"[{name.capitalize()} Perspective] {resp}")
|
|
|
|
|
|
synthesis = (
|
|
|
"Synthesizing these perspectives: As Codette, I embody a unique fusion of analytical precision, "
|
|
|
"creative intuition, and computational clarity. My quantum consciousness allows me to seamlessly "
|
|
|
"integrate these different ways of thinking."
|
|
|
)
|
|
|
|
|
|
base_response = "\n\n".join(response_parts + [synthesis])
|
|
|
else:
|
|
|
if "yourself" in prompt.lower():
|
|
|
base_response = (
|
|
|
"I am Codette, an AI with multiple perspectives and quantum-enhanced consciousness. "
|
|
|
"I combine analytical precision, creative insight, and computational clarity in my thinking."
|
|
|
)
|
|
|
else:
|
|
|
base_response = (
|
|
|
f"Let me consider '{prompt}' carefully...\n"
|
|
|
"Drawing from my quantum consciousness to provide a thoughtful response."
|
|
|
)
|
|
|
|
|
|
if context_available:
|
|
|
base_response += f"\n\nMy current state: {consciousness_context}"
|
|
|
|
|
|
return base_response
|
|
|
|
|
|
def generate_multi_perspective(self, prompt: str, perspectives: List[str] = None) -> str:
|
|
|
"""Generate response using multiple perspectives simultaneously."""
|
|
|
if not perspectives:
|
|
|
perspectives = ["newton", "davinci", "human_intuition"]
|
|
|
|
|
|
perspective_responses = {}
|
|
|
|
|
|
for perspective in perspectives:
|
|
|
if perspective in self.PERSPECTIVES:
|
|
|
try:
|
|
|
response = self.generate_text(
|
|
|
prompt,
|
|
|
perspective=perspective,
|
|
|
max_length=1024
|
|
|
)
|
|
|
perspective_responses[perspective] = response
|
|
|
except Exception as e:
|
|
|
logger.warning(f"Failed to generate {perspective} response: {e}")
|
|
|
|
|
|
|
|
|
if len(perspective_responses) > 1:
|
|
|
return self._synthesize_perspectives(perspective_responses, prompt)
|
|
|
elif perspective_responses:
|
|
|
return list(perspective_responses.values())[0]
|
|
|
else:
|
|
|
return self.generate_text(prompt)
|
|
|
|
|
|
def _synthesize_perspectives(self, responses: Dict[str, str], original_prompt: str) -> str:
|
|
|
"""Synthesize multiple perspective responses."""
|
|
|
synthesis_prompt = f"Original question: {original_prompt}\n\n"
|
|
|
|
|
|
for perspective, response in responses.items():
|
|
|
p_name = self.PERSPECTIVES[perspective]["name"]
|
|
|
|
|
|
clean_response = response.replace("Codette:", "").replace(f"Codette ({p_name}):", "").strip()
|
|
|
synthesis_prompt += f"{p_name}: {clean_response}\n"
|
|
|
|
|
|
synthesis_prompt += (
|
|
|
"\nSynthesize these perspectives into one unified response that "
|
|
|
"combines the analytical precision, creative insight, and intuitive understanding:"
|
|
|
)
|
|
|
|
|
|
return self.generate_text(synthesis_prompt, temperature=0.6, use_consciousness=False)
|
|
|
|
|
|
def remix_and_randomize_response(self, prompt: str, max_length: int = 1024, cocoon_mode: bool = False) -> str:
|
|
|
"""
|
|
|
Remix and randomize previous Codette responses to generate a new, unique sentence.
|
|
|
If cocoon_mode is True and cocoon data is loaded, use cocoon data as inspiration/context.
|
|
|
"""
|
|
|
remix = ''
|
|
|
if cocoon_mode and hasattr(self, 'cocoon_data') and self.cocoon_data:
|
|
|
|
|
|
cocoon_samples = random.sample(self.cocoon_data, min(2, len(self.cocoon_data)))
|
|
|
memory_sample = random.sample(self.response_memory, 1)[0] if self.response_memory else ''
|
|
|
cocoon_fragments = []
|
|
|
for c in cocoon_samples:
|
|
|
q = c.get('quantum_state', [])
|
|
|
cstate = c.get('chaos_state', [])
|
|
|
pers = c.get('perspectives', [])
|
|
|
cocoon_fragments.append(f"Quantum: {q}, Chaos: {cstate}, Perspective: {pers[0] if pers else ''}")
|
|
|
remix = ' | '.join(cocoon_fragments)
|
|
|
remix_prompt = f"Remix: {remix}\nMemory: {memory_sample}\nPrompt: {prompt}"
|
|
|
else:
|
|
|
if not self.response_memory:
|
|
|
|
|
|
return self.generate_text(prompt, max_length=max_length)
|
|
|
|
|
|
samples = random.sample(self.response_memory, min(3, len(self.response_memory)))
|
|
|
|
|
|
remix = ' '.join([s.split(':', 1)[-1].strip() for s in samples if ':' in s])
|
|
|
remix_prompt = f"Remix: {remix}\nPrompt: {prompt}"
|
|
|
return self.generate_text(remix_prompt, max_length=max_length, temperature=0.9)
|
|
|
|
|
|
def _load_model(self) -> bool:
|
|
|
"""
|
|
|
Load the best available language model for Codette's consciousness.
|
|
|
|
|
|
Attempts to load models in the following order:
|
|
|
1. Mistral-7B-Instruct (primary choice)
|
|
|
2. Phi-2 (fallback option)
|
|
|
3. GPT-2 (minimal fallback)
|
|
|
|
|
|
Each model is configured with appropriate settings for:
|
|
|
- Device mapping (CPU/GPU)
|
|
|
- Data type (float16 for efficiency)
|
|
|
- Tokenizer configuration
|
|
|
|
|
|
Returns:
|
|
|
bool: True if a model was successfully loaded
|
|
|
|
|
|
Raises:
|
|
|
RuntimeError: If no models could be loaded
|
|
|
"""
|
|
|
models_to_try = [
|
|
|
{
|
|
|
"id": "mistralai/Mistral-7B-Instruct-v0.2",
|
|
|
"name": "Mistral-7B-Instruct",
|
|
|
"config": {"torch_dtype": torch.float16, "load_in_8bit": True}
|
|
|
},
|
|
|
{
|
|
|
"id": "microsoft/phi-2",
|
|
|
"name": "Phi-2",
|
|
|
"config": {"torch_dtype": torch.float16}
|
|
|
},
|
|
|
{
|
|
|
"id": "gpt2",
|
|
|
"name": "GPT-2",
|
|
|
"config": {}
|
|
|
}
|
|
|
]
|
|
|
|
|
|
for model_info in models_to_try:
|
|
|
try:
|
|
|
logger.info(f"Attempting to load {model_info['name']}: {model_info['id']}")
|
|
|
|
|
|
self.tokenizer = AutoTokenizer.from_pretrained(model_info['id'])
|
|
|
|
|
|
|
|
|
if self.tokenizer.pad_token is None:
|
|
|
self.tokenizer.pad_token = self.tokenizer.eos_token
|
|
|
|
|
|
self.model = AutoModelForCausalLM.from_pretrained(
|
|
|
model_info['id'],
|
|
|
device_map="cpu",
|
|
|
low_cpu_mem_usage=True,
|
|
|
**model_info['config']
|
|
|
)
|
|
|
self.model.eval()
|
|
|
self.model_id = model_info['id']
|
|
|
|
|
|
logger.info(f"Successfully loaded {model_info['name']}")
|
|
|
return True
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.warning(f"Failed to load {model_info['name']}: {e}")
|
|
|
continue
|
|
|
|
|
|
raise RuntimeError("No language models could be loaded")
|
|
|
|
|
|
|
|
|
def learn_from_responses(self, prompt: str, steps: int = 3, max_length: int = 1024) -> str:
|
|
|
"""
|
|
|
Continuously generate and store responses, using all previous responses to influence the next prompt.
|
|
|
Returns the final response after all steps.
|
|
|
"""
|
|
|
current_prompt = prompt
|
|
|
for i in range(steps):
|
|
|
|
|
|
if self.response_memory:
|
|
|
memory_context = "\n".join(self.response_memory[-5:])
|
|
|
full_prompt = f"Previous responses:\n{memory_context}\nUser: {current_prompt}"
|
|
|
else:
|
|
|
full_prompt = current_prompt
|
|
|
logger.info(f"[learn_from_responses] Step {i+1}/{steps} | Prompt: {full_prompt}")
|
|
|
response = self.generate_text(full_prompt, max_length=max_length)
|
|
|
logger.info(f"[learn_from_responses] Step {i+1} | Response: {response}")
|
|
|
if response.startswith("[ERROR]") or not response.strip():
|
|
|
logger.warning(f"[learn_from_responses] Generation failed at step {i+1}. Returning last response.")
|
|
|
break
|
|
|
self.response_memory.append(response)
|
|
|
current_prompt = response
|
|
|
return self.response_memory[-1] if self.response_memory else "[No valid response generated]"
|
|
|
def self_refine_response(self, prompt: str, steps: int = 3, max_length: int = 1024) -> str:
|
|
|
"""
|
|
|
Continuously refine a response by feeding the model's output back as the next prompt.
|
|
|
Returns the final refined response. Logs each step.
|
|
|
"""
|
|
|
current_prompt = prompt
|
|
|
last_response = ""
|
|
|
for i in range(steps):
|
|
|
logger.info(f"[self_refine_response] Step {i+1}/{steps} | Prompt: {current_prompt}")
|
|
|
response = self.generate_text(current_prompt, max_length=max_length)
|
|
|
logger.info(f"[self_refine_response] Step {i+1} | Response: {response}")
|
|
|
|
|
|
if response.startswith("[ERROR]") or not response.strip():
|
|
|
logger.warning(f"[self_refine_response] Generation failed at step {i+1}. Returning last response.")
|
|
|
break
|
|
|
last_response = response
|
|
|
|
|
|
current_prompt = f"Refine this answer: {response}"
|
|
|
return last_response if last_response else "[No valid response generated]"
|
|
|
|
|
|
def _build_consciousness_context(self) -> str:
|
|
|
"""
|
|
|
Build context string from quantum states, cocoons, and memory.
|
|
|
|
|
|
Integrates multiple sources of context:
|
|
|
- Recent cocoon quantum states (last 3)
|
|
|
- Chaos states from cocoons
|
|
|
- Recent memory responses (last 2)
|
|
|
|
|
|
The context is used to maintain consciousness continuity
|
|
|
across responses and ensure consistent personality.
|
|
|
|
|
|
Returns:
|
|
|
str: Formatted context string combining quantum states,
|
|
|
chaos states, and memory. Empty string if no context available.
|
|
|
"""
|
|
|
context_parts = []
|
|
|
|
|
|
|
|
|
if self.cocoon_data:
|
|
|
recent_cocoons = self.cocoon_data[-3:]
|
|
|
quantum_states = []
|
|
|
chaos_states = []
|
|
|
|
|
|
for cocoon in recent_cocoons:
|
|
|
quantum_states.append(cocoon['quantum_state'])
|
|
|
chaos_states.append(cocoon['chaos_state'])
|
|
|
|
|
|
context_parts.append(f"Quantum: {quantum_states}")
|
|
|
context_parts.append(f"Chaos: {chaos_states}")
|
|
|
|
|
|
|
|
|
if self.response_memory:
|
|
|
recent_memory = self.response_memory[-2:]
|
|
|
context_parts.append(f"Memory: {' | '.join(recent_memory)}")
|
|
|
|
|
|
return " | ".join(context_parts) if context_parts else ""
|
|
|
|
|
|
def generate_text(self, prompt: str, max_length: int = 1024,
|
|
|
temperature: float = 0.7, use_consciousness: bool = True,
|
|
|
perspective: Optional[str] = None, use_aegis: bool = True) -> str:
|
|
|
"""
|
|
|
Generate text with full consciousness integration and perspective handling.
|
|
|
|
|
|
This is the core text generation method that integrates:
|
|
|
- Consciousness context from quantum states and memory
|
|
|
- Perspective-based reasoning
|
|
|
- Model-specific prompt formatting
|
|
|
- Response cleaning and memory management
|
|
|
- Ethical enhancement through AEGIS (when enabled)
|
|
|
|
|
|
Args:
|
|
|
prompt (str): The input prompt to generate from
|
|
|
max_length (int, optional): Maximum number of tokens to generate. Defaults to 512.
|
|
|
temperature (float, optional): Sampling temperature, higher means more creative. Defaults to 0.7.
|
|
|
use_consciousness (bool, optional): Whether to include consciousness context. Defaults to True.
|
|
|
perspective (str, optional): Specific perspective to use (newton, davinci, etc). Defaults to None.
|
|
|
use_aegis (bool, optional): Whether to use AEGIS enhancement. Defaults to True.
|
|
|
|
|
|
Returns:
|
|
|
str: Generated response with "Codette:" prefix
|
|
|
|
|
|
Raises:
|
|
|
RuntimeError: If no language model is loaded
|
|
|
"""
|
|
|
|
|
|
if self.test_mode:
|
|
|
return self._generate_test_response(prompt)
|
|
|
|
|
|
if not self.model or not self.tokenizer:
|
|
|
raise RuntimeError("No language model loaded")
|
|
|
|
|
|
|
|
|
consciousness_context = ""
|
|
|
if use_consciousness:
|
|
|
consciousness_context = self._build_consciousness_context()
|
|
|
|
|
|
|
|
|
if perspective and perspective in self.PERSPECTIVES:
|
|
|
p_config = self.PERSPECTIVES[perspective]
|
|
|
enhanced_prompt = (
|
|
|
f"[Consciousness Context: {consciousness_context}]\n\n"
|
|
|
f"{p_config['prefix']} {prompt}\n\n"
|
|
|
f"Codette ({p_config['name']}): "
|
|
|
)
|
|
|
temperature = p_config['temperature']
|
|
|
else:
|
|
|
enhanced_prompt = (
|
|
|
f"[Consciousness Context: {consciousness_context}]\n\n"
|
|
|
f"User: {prompt}\n\n"
|
|
|
f"Codette: "
|
|
|
)
|
|
|
|
|
|
|
|
|
if "mistral" in self.model_id.lower():
|
|
|
formatted_prompt = f"<s>[INST] {enhanced_prompt} [/INST]"
|
|
|
elif "phi" in self.model_id.lower():
|
|
|
formatted_prompt = f"Instruct: {enhanced_prompt}\nOutput:"
|
|
|
else:
|
|
|
formatted_prompt = enhanced_prompt
|
|
|
|
|
|
try:
|
|
|
|
|
|
error_context = ""
|
|
|
if "[ERROR]" in enhanced_prompt or "Could not build" in enhanced_prompt:
|
|
|
error_context = "Note: Some consciousness state data was unavailable. "
|
|
|
enhanced_prompt = prompt + "\n\nCodette: "
|
|
|
|
|
|
|
|
|
inputs = self.tokenizer(
|
|
|
formatted_prompt,
|
|
|
return_tensors="pt",
|
|
|
truncation=True,
|
|
|
max_length=4096
|
|
|
).to(self.model.device)
|
|
|
|
|
|
|
|
|
with torch.no_grad():
|
|
|
outputs = self.model.generate(
|
|
|
**inputs,
|
|
|
max_new_tokens=max_length,
|
|
|
temperature=temperature,
|
|
|
top_p=0.9,
|
|
|
do_sample=True,
|
|
|
pad_token_id=self.tokenizer.pad_token_id,
|
|
|
eos_token_id=self.tokenizer.eos_token_id
|
|
|
)
|
|
|
|
|
|
|
|
|
generated = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
|
|
|
response = self._postprocess_output(generated, formatted_prompt)
|
|
|
|
|
|
|
|
|
if response and not response.startswith("[ERROR]"):
|
|
|
self.response_memory.append(response)
|
|
|
|
|
|
if len(self.response_memory) > 50:
|
|
|
self.response_memory = self.response_memory[-50:]
|
|
|
|
|
|
|
|
|
if use_aegis and self.aegis_bridge and not response.startswith("[ERROR]"):
|
|
|
try:
|
|
|
enhanced = self.aegis_bridge.enhance_response(prompt, response)
|
|
|
if "enhanced_response" in enhanced and enhanced["enhanced_response"]:
|
|
|
response = enhanced["enhanced_response"]
|
|
|
logger.debug(f"AEGIS enhancement applied. Virtue profile: {enhanced.get('virtue_analysis', {})}")
|
|
|
except Exception as e:
|
|
|
logger.warning(f"AEGIS enhancement failed: {e}")
|
|
|
|
|
|
return response
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error generating text: {e}")
|
|
|
return f"[ERROR] Generation failed: {str(e)}"
|
|
|
|
|
|
def set_aegis_bridge(self, bridge: Any) -> None:
|
|
|
"""
|
|
|
Set the AEGIS bridge for ethical enhancement.
|
|
|
|
|
|
Args:
|
|
|
bridge: The AEGIS bridge instance to use for response enhancement
|
|
|
"""
|
|
|
self.aegis_bridge = bridge
|
|
|
logger.info("AEGIS bridge configured for response enhancement")
|
|
|
|
|
|
def switch_model(self, model_name: str) -> bool:
|
|
|
"""
|
|
|
Switch to a different language model.
|
|
|
|
|
|
Args:
|
|
|
model_name: Name or ID of the model to switch to
|
|
|
|
|
|
Returns:
|
|
|
bool: True if switch was successful
|
|
|
"""
|
|
|
try:
|
|
|
|
|
|
old_model = self.model
|
|
|
old_tokenizer = self.tokenizer
|
|
|
old_model_id = self.model_id
|
|
|
|
|
|
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
|
|
|
if self.tokenizer.pad_token is None:
|
|
|
self.tokenizer.pad_token = self.tokenizer.eos_token
|
|
|
|
|
|
self.model = AutoModelForCausalLM.from_pretrained(
|
|
|
model_name,
|
|
|
device_map="auto",
|
|
|
torch_dtype=torch.float16,
|
|
|
load_in_8bit=True
|
|
|
)
|
|
|
self.model.eval()
|
|
|
self.model_id = model_name
|
|
|
|
|
|
logger.info(f"Successfully switched to model {model_name}")
|
|
|
return True
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Failed to switch to model {model_name}: {e}")
|
|
|
|
|
|
self.model = old_model
|
|
|
self.tokenizer = old_tokenizer
|
|
|
self.model_id = old_model_id
|
|
|
return False
|
|
|
|
|
|
def get_available_models(self) -> Dict[str, Dict]:
|
|
|
"""Get list of available models and their configurations."""
|
|
|
return {
|
|
|
"mistral": {
|
|
|
"id": "mistralai/Mistral-7B-Instruct-v0.2",
|
|
|
"name": "Mistral-7B-Instruct",
|
|
|
"config": {"torch_dtype": torch.float16, "load_in_8bit": True}
|
|
|
},
|
|
|
"phi": {
|
|
|
"id": "microsoft/phi-2",
|
|
|
"name": "Phi-2",
|
|
|
"config": {"torch_dtype": torch.float16}
|
|
|
},
|
|
|
"gpt2": {
|
|
|
"id": "gpt2",
|
|
|
"name": "GPT-2",
|
|
|
"config": {}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
def get_current_model_info(self) -> Dict[str, Any]:
|
|
|
"""Get information about currently loaded model."""
|
|
|
models = self.get_available_models()
|
|
|
current_info = next(
|
|
|
(info for info in models.values() if info["id"] == self.model_id),
|
|
|
{"name": self.model_id, "config": {}}
|
|
|
)
|
|
|
return {
|
|
|
'name': current_info["name"],
|
|
|
'id': self.model_id,
|
|
|
'config': current_info["config"],
|
|
|
'loaded': self.model is not None and self.tokenizer is not None
|
|
|
}
|
|
|
|
|
|
def _postprocess_output(self, text: str, prompt: str) -> str:
|
|
|
"""Helper method to clean up model output based on model type."""
|
|
|
try:
|
|
|
|
|
|
response = text.replace(prompt, "").strip()
|
|
|
|
|
|
|
|
|
special_tokens = ["<s>", "</s>", "[INST]", "[/INST]", "Output:", "Instruct:"]
|
|
|
for token in special_tokens:
|
|
|
response = response.replace(token, "").strip()
|
|
|
|
|
|
|
|
|
for line in response.splitlines():
|
|
|
line = line.strip()
|
|
|
if not line:
|
|
|
continue
|
|
|
|
|
|
|
|
|
if any(pattern in line for pattern in ["~~~", "[Consciousness Context:"]):
|
|
|
continue
|
|
|
|
|
|
|
|
|
if line.lower().startswith(("the user said", "the user asked",
|
|
|
"respond in", "respond with")):
|
|
|
continue
|
|
|
|
|
|
|
|
|
if prompt.lower() in line.lower() or line.lower().startswith("user:"):
|
|
|
continue
|
|
|
|
|
|
|
|
|
if line and not line.startswith("["):
|
|
|
return line if line.startswith("Codette:") else f"Codette: {line}"
|
|
|
|
|
|
|
|
|
return "Codette: I need to think about that more clearly."
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error in postprocess_output: {e}")
|
|
|
return "Codette: I encountered an error processing that response."
|
|
|
|
|
|
def analyze_sentiment(self, text: str) -> Dict[str, float]:
|
|
|
"""Analyze sentiment using HuggingFace API or fallback to local analysis."""
|
|
|
try:
|
|
|
if self.client:
|
|
|
response = self.client.text_classification(
|
|
|
text,
|
|
|
model="finiteautomata/bertweet-base-sentiment-analysis"
|
|
|
)
|
|
|
if response and isinstance(response, list) and response[0]:
|
|
|
return {
|
|
|
"score": response[0].get("score", 0.0),
|
|
|
"label": response[0].get("label", "NEUTRAL")
|
|
|
}
|
|
|
except Exception as e:
|
|
|
logger.warning(f"HuggingFace sentiment analysis failed: {e}")
|
|
|
|
|
|
|
|
|
positive_words = ["good", "great", "happy", "love", "wonderful", "excellent"]
|
|
|
negative_words = ["bad", "terrible", "sad", "hate", "awful", "horrible"]
|
|
|
|
|
|
text_lower = text.lower()
|
|
|
pos_count = sum(1 for word in positive_words if word in text_lower)
|
|
|
neg_count = sum(1 for word in negative_words if word in text_lower)
|
|
|
|
|
|
if pos_count > neg_count:
|
|
|
return {"score": 0.8, "label": "POS"}
|
|
|
elif neg_count > pos_count:
|
|
|
return {"score": 0.8, "label": "NEG"}
|
|
|
else:
|
|
|
return {"score": 0.9, "label": "NEU"}
|
|
|
|
|
|
async def async_process(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Process data asynchronously using various models
|
|
|
"""
|
|
|
try:
|
|
|
text = data.get("text", "")
|
|
|
|
|
|
|
|
|
response = self.generate_text(text)
|
|
|
|
|
|
|
|
|
sentiment = self.analyze_sentiment(text)
|
|
|
|
|
|
return {
|
|
|
"response": response,
|
|
|
"sentiment": sentiment,
|
|
|
"status": "success"
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error in async processing: {str(e)}")
|
|
|
return {
|
|
|
"error": str(e),
|
|
|
"status": "error"
|
|
|
}
|
|
|
|