""" Cache manager for storing and retrieving agent answers. """ import os import json import hashlib from typing import Optional, Dict, Any, List from datetime import datetime class CacheManager: """Manages caching of agent answers to avoid redundant processing.""" def __init__(self, cache_dir: str = "cache"): self.cache_dir = cache_dir self.ensure_cache_dir() def ensure_cache_dir(self): """Create cache directory if it doesn't exist.""" if not os.path.exists(self.cache_dir): os.makedirs(self.cache_dir) def _get_question_hash(self, question: str) -> str: """Generate a hash for the question to use as filename.""" return hashlib.md5(question.encode('utf-8')).hexdigest()[:12] def _get_cache_path(self, question: str) -> str: """Get the cache file path for a question.""" question_hash = self._get_question_hash(question) return os.path.join(self.cache_dir, f"question_{question_hash}.json") def get_cached_answer(self, question: str) -> Optional[Dict[str, Any]]: """ Retrieve cached answer for a question. Args: question: The question to look up Returns: Dictionary with answer, iterations, and metadata if cached, None otherwise """ cache_path = self._get_cache_path(question) if not os.path.exists(cache_path): return None try: with open(cache_path, 'r', encoding='utf-8') as f: data = json.load(f) answers = data.get('answers', []) if not answers: return None last_answer = answers[-1] return { 'answer': last_answer.get('answer', ''), 'iterations': last_answer.get('iterations', 0), 'timestamp': last_answer.get('timestamp', ''), 'cache_valid': data.get('cache_valid', False), 'file_name': data.get('file_name', None) } except Exception as e: print(f"Error reading cache: {e}") return None def cache_answer(self, question: str, answer: Optional[str], iterations: int = 1, file_name: Optional[str] = None) -> bool: """ Cache an answer for a question with iteration count. Args: question: The question that was asked answer: The answer to cache iterations: Number of iterations/steps used (should be 1-10 typically) Returns: True if cached successfully, False otherwise """ cache_path = self._get_cache_path(question) cache_valid = bool(answer and self.validate_answer_content(answer)) now = datetime.now().isoformat() try: if os.path.exists(cache_path): with open(cache_path, 'r', encoding='utf-8') as f: data = json.load(f) else: data = { 'question': question, 'answers': [], 'cache_valid': False, 'file_name': file_name } # Always update file_name for logging if file_name: data['file_name'] = file_name print(f"[CacheManager] file_name submitted: {file_name}") # Add answer if available, else just update cache_valid if cache_valid: data['answers'].append({ 'answer': answer, 'iterations': iterations, 'timestamp': now }) data['cache_valid'] = True else: # Even if no answer, mark cache_valid false and add a stub answer data['answers'].append({ 'answer': answer if answer else "", 'iterations': iterations, 'timestamp': now }) data['cache_valid'] = False with open(cache_path, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2) return True except Exception as e: print(f"Error caching answer: {e}") return False def validate_answer_content(self, answer: str) -> bool: """ Validate that answer content is reasonable to cache. Error messages and corrupted responses should NOT be cached as valid. Args: answer: The answer content to validate Returns: True if answer is valid to cache, False otherwise """ if not answer or not isinstance(answer, str): return False clean_answer = answer.strip() if len(clean_answer) < 3: return False # Check for error patterns - these should NEVER be cached as valid answers error_patterns = [ 'error calling llm', 'error running agent', 'error in', 'error processing', 'litellm.badrequest', 'litellm.exception', 'vertexaiexception', 'badrequest', 'invalid_argument', 'authentication', 'credentials', 'api key', 'traceback', 'exception occurred', 'failed to', 'unable to submit', 'mimetype parameter', 'not supported' ] # Check if answer contains any error patterns (case insensitive) lower_answer = clean_answer.lower() for pattern in error_patterns: if pattern in lower_answer: print(f"[CacheManager] Rejecting answer containing error pattern: '{pattern}'") return False # Check for corrupt/empty patterns corrupt_patterns = [']', '[', '{}', '()', '""', "''", 'null', 'undefined'] if clean_answer in corrupt_patterns: return False # Check if answer is only brackets/punctuation if all(c in '[]{}()' for c in clean_answer): return False return True def clear_cache(self): """Clear all cached answers.""" try: for filename in os.listdir(self.cache_dir): file_path = os.path.join(self.cache_dir, filename) if os.path.isfile(file_path): os.remove(file_path) print("Cache cleared successfully") except Exception as e: print(f"Error clearing cache: {e}") def list_cached_questions(self) -> List[Dict[str, Any]]: """List all cached questions with metadata.""" cached_questions = [] try: for filename in os.listdir(self.cache_dir): if filename.startswith('question_') and filename.endswith('.json'): cache_path = os.path.join(self.cache_dir, filename) with open(cache_path, 'r', encoding='utf-8') as f: data = json.load(f) cached_questions.append({ 'question': data.get('question', ''), 'cache_valid': data.get('cache_valid', False), 'file_name': data.get('file_name', None), 'last_timestamp': data['answers'][-1]['timestamp'] if data.get('answers') else None }) except Exception as e: print(f"Error listing cached questions: {e}") return sorted(cached_questions, key=lambda x: x.get('last_timestamp', ''), reverse=True) def cleanup_invalid_cache_entries(self) -> int: """ Clean up cache entries that contain error messages or invalid content. Returns: Number of entries cleaned up """ cleaned_count = 0 try: for filename in os.listdir(self.cache_dir): if filename.startswith('question_') and filename.endswith('.json'): cache_path = os.path.join(self.cache_dir, filename) try: with open(cache_path, 'r', encoding='utf-8') as f: data = json.load(f) # Check if this entry should be cleaned up should_cleanup = False # Check if cache_valid is True but contains invalid content if data.get('cache_valid', False): answers = data.get('answers', []) for answer_entry in answers: answer_text = answer_entry.get('answer', '') if not self.validate_answer_content(answer_text): print(f"Found invalid cached answer in {filename}: {answer_text[:100]}...") should_cleanup = True break if should_cleanup: # Mark as invalid instead of deleting to preserve history data['cache_valid'] = False with open(cache_path, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2) cleaned_count += 1 print(f"Cleaned up invalid cache entry: {filename}") except Exception as e: print(f"Error processing cache file {filename}: {e}") continue except Exception as e: print(f"Error during cache cleanup: {e}") print(f"Cache cleanup completed. {cleaned_count} entries cleaned up.") return cleaned_count