#!/usr/bin/env python3 """ Enhanced RAG News Manager for Google Drive Saves high-confidence news (95%+ from Gemini) to Google Drive for RAG purposes """ import json import os import hashlib from datetime import datetime from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from google.auth.transport.requests import Request from googleapiclient.discovery import build from googleapiclient.http import MediaIoBaseDownload, MediaIoBaseUpload import io # Configuration SCOPES = ['https://www.googleapis.com/auth/drive.file'] RAG_FOLDER_NAME = "Vietnamese_Fake_News_RAG" RAG_FILE_NAME = "high_confidence_news.json" CONFIDENCE_THRESHOLD = 0.95 # 95% threshold class RAGNewsManager: def __init__(self): self.service = None self.rag_folder_id = None self.rag_file_id = None self.credentials_file = 'credentials.json' self.token_file = 'token.json' def authenticate(self): """Authenticate with Google Drive API""" try: creds = None # Check if running on Hugging Face Spaces is_hf_space = os.getenv('SPACE_ID') is not None if is_hf_space: # For Hugging Face Spaces, use environment variables client_id = os.getenv('GOOGLE_CLIENT_ID') client_secret = os.getenv('GOOGLE_CLIENT_SECRET') refresh_token = os.getenv('GOOGLE_REFRESH_TOKEN') if client_id and client_secret and refresh_token: creds = Credentials.from_authorized_user_info({ 'client_id': client_id, 'client_secret': client_secret, 'refresh_token': refresh_token, 'token_uri': 'https://oauth2.googleapis.com/token' }, SCOPES) else: print("⚠️ Google Drive credentials not found in Hugging Face secrets") return False else: # For local development, use files if os.path.exists(self.token_file): creds = Credentials.from_authorized_user_file(self.token_file, SCOPES) # If no valid credentials, request authorization if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: if os.path.exists(self.credentials_file): flow = InstalledAppFlow.from_client_secrets_file( self.credentials_file, SCOPES) creds = flow.run_local_server(port=0) else: print("⚠️ credentials.json not found for local development") return False # Save credentials for next run with open(self.token_file, 'w') as token: token.write(creds.to_json()) self.service = build('drive', 'v3', credentials=creds) print("✅ Google Drive authentication successful!") return True except Exception as e: print(f"❌ Google Drive authentication failed: {e}") return False def setup_rag_folder(self): """Create or find the RAG folder in Google Drive""" try: # Check if folder already exists results = self.service.files().list( q=f"name='{RAG_FOLDER_NAME}' and mimeType='application/vnd.google-apps.folder'", fields="files(id, name)" ).execute() folders = results.get('files', []) if folders: self.rag_folder_id = folders[0]['id'] print(f"✅ Found existing RAG folder: {RAG_FOLDER_NAME}") else: # Create new folder folder_metadata = { 'name': RAG_FOLDER_NAME, 'mimeType': 'application/vnd.google-apps.folder' } folder = self.service.files().create( body=folder_metadata, fields='id' ).execute() self.rag_folder_id = folder.get('id') print(f"✅ Created new RAG folder: {RAG_FOLDER_NAME}") return True except Exception as e: print(f"❌ Error setting up RAG folder: {e}") return False def setup_rag_file(self): """Create or find the RAG data file""" try: # Check if file already exists results = self.service.files().list( q=f"name='{RAG_FILE_NAME}' and parents in '{self.rag_folder_id}'", fields="files(id, name)" ).execute() files = results.get('files', []) if files: self.rag_file_id = files[0]['id'] print(f"✅ Found existing RAG file: {RAG_FILE_NAME}") else: # Create new file with empty data initial_data = { "metadata": { "created_at": datetime.now().isoformat(), "description": "High-confidence Vietnamese fake news for RAG", "threshold": CONFIDENCE_THRESHOLD, "total_entries": 0 }, "news_entries": [] } file_metadata = { 'name': RAG_FILE_NAME, 'parents': [self.rag_folder_id] } media = MediaIoBaseUpload( io.BytesIO(json.dumps(initial_data, ensure_ascii=False, indent=2).encode('utf-8')), mimetype='application/json' ) file = self.service.files().create( body=file_metadata, media_body=media, fields='id' ).execute() self.rag_file_id = file.get('id') print(f"✅ Created new RAG file: {RAG_FILE_NAME}") return True except Exception as e: print(f"❌ Error setting up RAG file: {e}") return False def load_rag_data(self): """Load existing RAG data from Google Drive""" try: if not self.rag_file_id: return {"metadata": {"total_entries": 0}, "news_entries": []} request = self.service.files().get_media(fileId=self.rag_file_id) file_content = io.BytesIO() downloader = MediaIoBaseDownload(file_content, request) done = False while done is False: status, done = downloader.next_chunk() file_content.seek(0) data = json.loads(file_content.read().decode('utf-8')) print(f"📚 Loaded {data.get('metadata', {}).get('total_entries', 0)} entries from RAG file") return data except Exception as e: print(f"❌ Error loading RAG data: {e}") return {"metadata": {"total_entries": 0}, "news_entries": []} def save_rag_data(self, data): """Save RAG data to Google Drive""" try: if not self.rag_file_id: return False # Update metadata data['metadata']['last_updated'] = datetime.now().isoformat() data['metadata']['total_entries'] = len(data['news_entries']) # Convert to JSON json_data = json.dumps(data, ensure_ascii=False, indent=2) media = MediaIoBaseUpload( io.BytesIO(json_data.encode('utf-8')), mimetype='application/json' ) # Update the file self.service.files().update( fileId=self.rag_file_id, media_body=media ).execute() print(f"✅ Saved {len(data['news_entries'])} entries to RAG file") return True except Exception as e: print(f"❌ Error saving RAG data: {e}") return False def add_high_confidence_news(self, news_text, gemini_analysis, gemini_confidence, prediction, search_results=None, distilbert_confidence=None): """Add high-confidence news to RAG system""" try: # Check confidence threshold if gemini_confidence < CONFIDENCE_THRESHOLD: print(f"⚠️ Confidence {gemini_confidence:.1%} below threshold {CONFIDENCE_THRESHOLD:.1%}") return False # Create content hash for deduplication content_hash = hashlib.md5(news_text.encode('utf-8')).hexdigest() # Load existing data data = self.load_rag_data() # Check if entry already exists for entry in data['news_entries']: if entry.get('content_hash') == content_hash: print(f"⚠️ News already exists in RAG (hash: {content_hash[:8]}...)") return False # Create new entry new_entry = { 'id': len(data['news_entries']) + 1, 'content_hash': content_hash, 'news_text': news_text, 'prediction': prediction, 'gemini_confidence': gemini_confidence, 'gemini_analysis': gemini_analysis, 'distilbert_confidence': distilbert_confidence, 'search_results': search_results or [], 'created_at': datetime.now().isoformat(), 'source': 'user_input', 'verified': True # High confidence means verified } # Add to data data['news_entries'].append(new_entry) # Save to Google Drive success = self.save_rag_data(data) if success: print(f"✅ Added high-confidence news to RAG:") print(f" 📰 News: {news_text[:100]}...") print(f" 🎯 Prediction: {prediction}") print(f" 📊 Confidence: {gemini_confidence:.1%}") print(f" 🔗 Hash: {content_hash[:8]}...") return True else: return False except Exception as e: print(f"❌ Error adding news to RAG: {e}") return False def search_rag_news(self, query_text, limit=5): """Search RAG news for similar entries""" try: data = self.load_rag_data() if not data['news_entries']: return [] results = [] query_lower = query_text.lower() for entry in data['news_entries']: # Simple text similarity search if (query_lower in entry.get('news_text', '').lower() or query_lower in entry.get('gemini_analysis', '').lower()): results.append({ 'news_text': entry['news_text'], 'prediction': entry['prediction'], 'confidence': entry['gemini_confidence'], 'analysis': entry['gemini_analysis'], 'created_at': entry['created_at'], 'id': entry['id'] }) # Sort by confidence and creation date results.sort(key=lambda x: (x['confidence'], x['created_at']), reverse=True) results = results[:limit] if results: print(f"🔍 Found {len(results)} similar entries in RAG") return results except Exception as e: print(f"❌ Error searching RAG news: {e}") return [] def get_rag_statistics(self): """Get statistics about RAG data""" try: data = self.load_rag_data() entries = data['news_entries'] if not entries: return { 'total_entries': 0, 'real_count': 0, 'fake_count': 0, 'avg_confidence': 0, 'latest_entry': None, 'folder_id': self.rag_folder_id, 'file_id': self.rag_file_id } real_count = sum(1 for entry in entries if entry['prediction'] == 'REAL') fake_count = sum(1 for entry in entries if entry['prediction'] == 'FAKE') avg_confidence = sum(entry['gemini_confidence'] for entry in entries) / len(entries) # Get latest entry latest_entry = max(entries, key=lambda x: x['created_at']) if entries else None stats = { 'total_entries': len(entries), 'real_count': real_count, 'fake_count': fake_count, 'avg_confidence': avg_confidence, 'latest_entry': latest_entry, 'folder_id': self.rag_folder_id, 'file_id': self.rag_file_id } return stats except Exception as e: print(f"❌ Error getting RAG statistics: {e}") return None def initialize(self): """Initialize the RAG system""" print("🚀 Initializing RAG News Manager...") if not self.authenticate(): return False if not self.setup_rag_folder(): return False if not self.setup_rag_file(): return False print("✅ RAG News Manager initialized successfully!") return True # Global instance rag_manager = RAGNewsManager() def initialize_rag_system(): """Initialize the RAG system""" return rag_manager.initialize() def add_news_to_rag(news_text, gemini_analysis, gemini_confidence, prediction, search_results=None, distilbert_confidence=None): """Add news to RAG system if confidence is high enough""" return rag_manager.add_high_confidence_news( news_text, gemini_analysis, gemini_confidence, prediction, search_results, distilbert_confidence ) def search_rag_for_context(query_text, limit=3): """Search RAG for context to use in analysis""" return rag_manager.search_rag_news(query_text, limit) def get_rag_stats(): """Get RAG system statistics""" return rag_manager.get_rag_statistics() if __name__ == "__main__": # Test the RAG system print("Testing RAG News Manager...") if initialize_rag_system(): # Test adding a news entry test_news = "Argentina vô địch World Cup 2022 là sự thật" test_analysis = "1. KẾT LUẬN: THẬT\n2. ĐỘ TIN CẬY: THẬT: 98% / GIẢ: 2%" test_confidence = 0.98 success = add_news_to_rag( news_text=test_news, gemini_analysis=test_analysis, gemini_confidence=test_confidence, prediction="REAL" ) if success: print("✅ Test news added successfully!") # Get statistics stats = get_rag_stats() if stats: print(f"📊 RAG Statistics:") print(f" Total entries: {stats['total_entries']}") print(f" Real news: {stats['real_count']}") print(f" Fake news: {stats['fake_count']}") print(f" Average confidence: {stats['avg_confidence']:.1%}") print(f" Google Drive folder ID: {stats['folder_id']}") print(f" Google Drive file ID: {stats['file_id']}") else: print("❌ Failed to add test news") else: print("❌ Failed to initialize RAG system")