Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Enhanced RAG News Manager for Google Drive | |
| Saves high-confidence news (95%+ from Gemini) to Google Drive for RAG purposes | |
| """ | |
| import json | |
| import os | |
| import hashlib | |
| from datetime import datetime | |
| from google.oauth2.credentials import Credentials | |
| from google_auth_oauthlib.flow import InstalledAppFlow | |
| from google.auth.transport.requests import Request | |
| from googleapiclient.discovery import build | |
| from googleapiclient.http import MediaIoBaseDownload, MediaIoBaseUpload | |
| import io | |
| # Configuration | |
| SCOPES = ['https://www.googleapis.com/auth/drive.file'] | |
| RAG_FOLDER_NAME = "Vietnamese_Fake_News_RAG" | |
| RAG_FILE_NAME = "high_confidence_news.json" | |
| CONFIDENCE_THRESHOLD = 0.95 # 95% threshold | |
| class RAGNewsManager: | |
| def __init__(self): | |
| self.service = None | |
| self.rag_folder_id = None | |
| self.rag_file_id = None | |
| self.credentials_file = 'credentials.json' | |
| self.token_file = 'token.json' | |
| def authenticate(self): | |
| """Authenticate with Google Drive API""" | |
| try: | |
| creds = None | |
| # Check if running on Hugging Face Spaces | |
| is_hf_space = os.getenv('SPACE_ID') is not None | |
| if is_hf_space: | |
| # For Hugging Face Spaces, use environment variables | |
| client_id = os.getenv('GOOGLE_CLIENT_ID') | |
| client_secret = os.getenv('GOOGLE_CLIENT_SECRET') | |
| refresh_token = os.getenv('GOOGLE_REFRESH_TOKEN') | |
| if client_id and client_secret and refresh_token: | |
| creds = Credentials.from_authorized_user_info({ | |
| 'client_id': client_id, | |
| 'client_secret': client_secret, | |
| 'refresh_token': refresh_token, | |
| 'token_uri': 'https://oauth2.googleapis.com/token' | |
| }, SCOPES) | |
| else: | |
| print("⚠️ Google Drive credentials not found in Hugging Face secrets") | |
| return False | |
| else: | |
| # For local development, use files | |
| if os.path.exists(self.token_file): | |
| creds = Credentials.from_authorized_user_file(self.token_file, SCOPES) | |
| # If no valid credentials, request authorization | |
| if not creds or not creds.valid: | |
| if creds and creds.expired and creds.refresh_token: | |
| creds.refresh(Request()) | |
| else: | |
| if os.path.exists(self.credentials_file): | |
| flow = InstalledAppFlow.from_client_secrets_file( | |
| self.credentials_file, SCOPES) | |
| creds = flow.run_local_server(port=0) | |
| else: | |
| print("⚠️ credentials.json not found for local development") | |
| return False | |
| # Save credentials for next run | |
| with open(self.token_file, 'w') as token: | |
| token.write(creds.to_json()) | |
| self.service = build('drive', 'v3', credentials=creds) | |
| print("✅ Google Drive authentication successful!") | |
| return True | |
| except Exception as e: | |
| print(f"❌ Google Drive authentication failed: {e}") | |
| return False | |
| def setup_rag_folder(self): | |
| """Create or find the RAG folder in Google Drive""" | |
| try: | |
| # Check if folder already exists | |
| results = self.service.files().list( | |
| q=f"name='{RAG_FOLDER_NAME}' and mimeType='application/vnd.google-apps.folder'", | |
| fields="files(id, name)" | |
| ).execute() | |
| folders = results.get('files', []) | |
| if folders: | |
| self.rag_folder_id = folders[0]['id'] | |
| print(f"✅ Found existing RAG folder: {RAG_FOLDER_NAME}") | |
| else: | |
| # Create new folder | |
| folder_metadata = { | |
| 'name': RAG_FOLDER_NAME, | |
| 'mimeType': 'application/vnd.google-apps.folder' | |
| } | |
| folder = self.service.files().create( | |
| body=folder_metadata, | |
| fields='id' | |
| ).execute() | |
| self.rag_folder_id = folder.get('id') | |
| print(f"✅ Created new RAG folder: {RAG_FOLDER_NAME}") | |
| return True | |
| except Exception as e: | |
| print(f"❌ Error setting up RAG folder: {e}") | |
| return False | |
| def setup_rag_file(self): | |
| """Create or find the RAG data file""" | |
| try: | |
| # Check if file already exists | |
| results = self.service.files().list( | |
| q=f"name='{RAG_FILE_NAME}' and parents in '{self.rag_folder_id}'", | |
| fields="files(id, name)" | |
| ).execute() | |
| files = results.get('files', []) | |
| if files: | |
| self.rag_file_id = files[0]['id'] | |
| print(f"✅ Found existing RAG file: {RAG_FILE_NAME}") | |
| else: | |
| # Create new file with empty data | |
| initial_data = { | |
| "metadata": { | |
| "created_at": datetime.now().isoformat(), | |
| "description": "High-confidence Vietnamese fake news for RAG", | |
| "threshold": CONFIDENCE_THRESHOLD, | |
| "total_entries": 0 | |
| }, | |
| "news_entries": [] | |
| } | |
| file_metadata = { | |
| 'name': RAG_FILE_NAME, | |
| 'parents': [self.rag_folder_id] | |
| } | |
| media = MediaIoBaseUpload( | |
| io.BytesIO(json.dumps(initial_data, ensure_ascii=False, indent=2).encode('utf-8')), | |
| mimetype='application/json' | |
| ) | |
| file = self.service.files().create( | |
| body=file_metadata, | |
| media_body=media, | |
| fields='id' | |
| ).execute() | |
| self.rag_file_id = file.get('id') | |
| print(f"✅ Created new RAG file: {RAG_FILE_NAME}") | |
| return True | |
| except Exception as e: | |
| print(f"❌ Error setting up RAG file: {e}") | |
| return False | |
| def load_rag_data(self): | |
| """Load existing RAG data from Google Drive""" | |
| try: | |
| if not self.rag_file_id: | |
| return {"metadata": {"total_entries": 0}, "news_entries": []} | |
| request = self.service.files().get_media(fileId=self.rag_file_id) | |
| file_content = io.BytesIO() | |
| downloader = MediaIoBaseDownload(file_content, request) | |
| done = False | |
| while done is False: | |
| status, done = downloader.next_chunk() | |
| file_content.seek(0) | |
| data = json.loads(file_content.read().decode('utf-8')) | |
| print(f"📚 Loaded {data.get('metadata', {}).get('total_entries', 0)} entries from RAG file") | |
| return data | |
| except Exception as e: | |
| print(f"❌ Error loading RAG data: {e}") | |
| return {"metadata": {"total_entries": 0}, "news_entries": []} | |
| def save_rag_data(self, data): | |
| """Save RAG data to Google Drive""" | |
| try: | |
| if not self.rag_file_id: | |
| return False | |
| # Update metadata | |
| data['metadata']['last_updated'] = datetime.now().isoformat() | |
| data['metadata']['total_entries'] = len(data['news_entries']) | |
| # Convert to JSON | |
| json_data = json.dumps(data, ensure_ascii=False, indent=2) | |
| media = MediaIoBaseUpload( | |
| io.BytesIO(json_data.encode('utf-8')), | |
| mimetype='application/json' | |
| ) | |
| # Update the file | |
| self.service.files().update( | |
| fileId=self.rag_file_id, | |
| media_body=media | |
| ).execute() | |
| print(f"✅ Saved {len(data['news_entries'])} entries to RAG file") | |
| return True | |
| except Exception as e: | |
| print(f"❌ Error saving RAG data: {e}") | |
| return False | |
| def add_high_confidence_news(self, news_text, gemini_analysis, gemini_confidence, | |
| prediction, search_results=None, distilbert_confidence=None): | |
| """Add high-confidence news to RAG system""" | |
| try: | |
| # Check confidence threshold | |
| if gemini_confidence < CONFIDENCE_THRESHOLD: | |
| print(f"⚠️ Confidence {gemini_confidence:.1%} below threshold {CONFIDENCE_THRESHOLD:.1%}") | |
| return False | |
| # Create content hash for deduplication | |
| content_hash = hashlib.md5(news_text.encode('utf-8')).hexdigest() | |
| # Load existing data | |
| data = self.load_rag_data() | |
| # Check if entry already exists | |
| for entry in data['news_entries']: | |
| if entry.get('content_hash') == content_hash: | |
| print(f"⚠️ News already exists in RAG (hash: {content_hash[:8]}...)") | |
| return False | |
| # Create new entry | |
| new_entry = { | |
| 'id': len(data['news_entries']) + 1, | |
| 'content_hash': content_hash, | |
| 'news_text': news_text, | |
| 'prediction': prediction, | |
| 'gemini_confidence': gemini_confidence, | |
| 'gemini_analysis': gemini_analysis, | |
| 'distilbert_confidence': distilbert_confidence, | |
| 'search_results': search_results or [], | |
| 'created_at': datetime.now().isoformat(), | |
| 'source': 'user_input', | |
| 'verified': True # High confidence means verified | |
| } | |
| # Add to data | |
| data['news_entries'].append(new_entry) | |
| # Save to Google Drive | |
| success = self.save_rag_data(data) | |
| if success: | |
| print(f"✅ Added high-confidence news to RAG:") | |
| print(f" 📰 News: {news_text[:100]}...") | |
| print(f" 🎯 Prediction: {prediction}") | |
| print(f" 📊 Confidence: {gemini_confidence:.1%}") | |
| print(f" 🔗 Hash: {content_hash[:8]}...") | |
| return True | |
| else: | |
| return False | |
| except Exception as e: | |
| print(f"❌ Error adding news to RAG: {e}") | |
| return False | |
| def search_rag_news(self, query_text, limit=5): | |
| """Search RAG news for similar entries""" | |
| try: | |
| data = self.load_rag_data() | |
| if not data['news_entries']: | |
| return [] | |
| results = [] | |
| query_lower = query_text.lower() | |
| for entry in data['news_entries']: | |
| # Simple text similarity search | |
| if (query_lower in entry.get('news_text', '').lower() or | |
| query_lower in entry.get('gemini_analysis', '').lower()): | |
| results.append({ | |
| 'news_text': entry['news_text'], | |
| 'prediction': entry['prediction'], | |
| 'confidence': entry['gemini_confidence'], | |
| 'analysis': entry['gemini_analysis'], | |
| 'created_at': entry['created_at'], | |
| 'id': entry['id'] | |
| }) | |
| # Sort by confidence and creation date | |
| results.sort(key=lambda x: (x['confidence'], x['created_at']), reverse=True) | |
| results = results[:limit] | |
| if results: | |
| print(f"🔍 Found {len(results)} similar entries in RAG") | |
| return results | |
| except Exception as e: | |
| print(f"❌ Error searching RAG news: {e}") | |
| return [] | |
| def get_rag_statistics(self): | |
| """Get statistics about RAG data""" | |
| try: | |
| data = self.load_rag_data() | |
| entries = data['news_entries'] | |
| if not entries: | |
| return { | |
| 'total_entries': 0, | |
| 'real_count': 0, | |
| 'fake_count': 0, | |
| 'avg_confidence': 0, | |
| 'latest_entry': None, | |
| 'folder_id': self.rag_folder_id, | |
| 'file_id': self.rag_file_id | |
| } | |
| real_count = sum(1 for entry in entries if entry['prediction'] == 'REAL') | |
| fake_count = sum(1 for entry in entries if entry['prediction'] == 'FAKE') | |
| avg_confidence = sum(entry['gemini_confidence'] for entry in entries) / len(entries) | |
| # Get latest entry | |
| latest_entry = max(entries, key=lambda x: x['created_at']) if entries else None | |
| stats = { | |
| 'total_entries': len(entries), | |
| 'real_count': real_count, | |
| 'fake_count': fake_count, | |
| 'avg_confidence': avg_confidence, | |
| 'latest_entry': latest_entry, | |
| 'folder_id': self.rag_folder_id, | |
| 'file_id': self.rag_file_id | |
| } | |
| return stats | |
| except Exception as e: | |
| print(f"❌ Error getting RAG statistics: {e}") | |
| return None | |
| def initialize(self): | |
| """Initialize the RAG system""" | |
| print("🚀 Initializing RAG News Manager...") | |
| if not self.authenticate(): | |
| return False | |
| if not self.setup_rag_folder(): | |
| return False | |
| if not self.setup_rag_file(): | |
| return False | |
| print("✅ RAG News Manager initialized successfully!") | |
| return True | |
| # Global instance | |
| rag_manager = RAGNewsManager() | |
| def initialize_rag_system(): | |
| """Initialize the RAG system""" | |
| return rag_manager.initialize() | |
| def add_news_to_rag(news_text, gemini_analysis, gemini_confidence, prediction, | |
| search_results=None, distilbert_confidence=None): | |
| """Add news to RAG system if confidence is high enough""" | |
| return rag_manager.add_high_confidence_news( | |
| news_text, gemini_analysis, gemini_confidence, prediction, | |
| search_results, distilbert_confidence | |
| ) | |
| def search_rag_for_context(query_text, limit=3): | |
| """Search RAG for context to use in analysis""" | |
| return rag_manager.search_rag_news(query_text, limit) | |
| def get_rag_stats(): | |
| """Get RAG system statistics""" | |
| return rag_manager.get_rag_statistics() | |
| if __name__ == "__main__": | |
| # Test the RAG system | |
| print("Testing RAG News Manager...") | |
| if initialize_rag_system(): | |
| # Test adding a news entry | |
| test_news = "Argentina vô địch World Cup 2022 là sự thật" | |
| test_analysis = "1. KẾT LUẬN: THẬT\n2. ĐỘ TIN CẬY: THẬT: 98% / GIẢ: 2%" | |
| test_confidence = 0.98 | |
| success = add_news_to_rag( | |
| news_text=test_news, | |
| gemini_analysis=test_analysis, | |
| gemini_confidence=test_confidence, | |
| prediction="REAL" | |
| ) | |
| if success: | |
| print("✅ Test news added successfully!") | |
| # Get statistics | |
| stats = get_rag_stats() | |
| if stats: | |
| print(f"📊 RAG Statistics:") | |
| print(f" Total entries: {stats['total_entries']}") | |
| print(f" Real news: {stats['real_count']}") | |
| print(f" Fake news: {stats['fake_count']}") | |
| print(f" Average confidence: {stats['avg_confidence']:.1%}") | |
| print(f" Google Drive folder ID: {stats['folder_id']}") | |
| print(f" Google Drive file ID: {stats['file_id']}") | |
| else: | |
| print("❌ Failed to add test news") | |
| else: | |
| print("❌ Failed to initialize RAG system") | |