FakeNews_Detector / rag_news_manager.py
NLong's picture
Upload 12 files
b5fb8d2 verified
#!/usr/bin/env python3
"""
Enhanced RAG News Manager for Google Drive
Saves high-confidence news (95%+ from Gemini) to Google Drive for RAG purposes
"""
import json
import os
import hashlib
from datetime import datetime
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseDownload, MediaIoBaseUpload
import io
# Configuration
SCOPES = ['https://www.googleapis.com/auth/drive.file']
RAG_FOLDER_NAME = "Vietnamese_Fake_News_RAG"
RAG_FILE_NAME = "high_confidence_news.json"
CONFIDENCE_THRESHOLD = 0.95 # 95% threshold
class RAGNewsManager:
def __init__(self):
self.service = None
self.rag_folder_id = None
self.rag_file_id = None
self.credentials_file = 'credentials.json'
self.token_file = 'token.json'
def authenticate(self):
"""Authenticate with Google Drive API"""
try:
creds = None
# Check if running on Hugging Face Spaces
is_hf_space = os.getenv('SPACE_ID') is not None
if is_hf_space:
# For Hugging Face Spaces, use environment variables
client_id = os.getenv('GOOGLE_CLIENT_ID')
client_secret = os.getenv('GOOGLE_CLIENT_SECRET')
refresh_token = os.getenv('GOOGLE_REFRESH_TOKEN')
if client_id and client_secret and refresh_token:
creds = Credentials.from_authorized_user_info({
'client_id': client_id,
'client_secret': client_secret,
'refresh_token': refresh_token,
'token_uri': 'https://oauth2.googleapis.com/token'
}, SCOPES)
else:
print("⚠️ Google Drive credentials not found in Hugging Face secrets")
return False
else:
# For local development, use files
if os.path.exists(self.token_file):
creds = Credentials.from_authorized_user_file(self.token_file, SCOPES)
# If no valid credentials, request authorization
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
if os.path.exists(self.credentials_file):
flow = InstalledAppFlow.from_client_secrets_file(
self.credentials_file, SCOPES)
creds = flow.run_local_server(port=0)
else:
print("⚠️ credentials.json not found for local development")
return False
# Save credentials for next run
with open(self.token_file, 'w') as token:
token.write(creds.to_json())
self.service = build('drive', 'v3', credentials=creds)
print("✅ Google Drive authentication successful!")
return True
except Exception as e:
print(f"❌ Google Drive authentication failed: {e}")
return False
def setup_rag_folder(self):
"""Create or find the RAG folder in Google Drive"""
try:
# Check if folder already exists
results = self.service.files().list(
q=f"name='{RAG_FOLDER_NAME}' and mimeType='application/vnd.google-apps.folder'",
fields="files(id, name)"
).execute()
folders = results.get('files', [])
if folders:
self.rag_folder_id = folders[0]['id']
print(f"✅ Found existing RAG folder: {RAG_FOLDER_NAME}")
else:
# Create new folder
folder_metadata = {
'name': RAG_FOLDER_NAME,
'mimeType': 'application/vnd.google-apps.folder'
}
folder = self.service.files().create(
body=folder_metadata,
fields='id'
).execute()
self.rag_folder_id = folder.get('id')
print(f"✅ Created new RAG folder: {RAG_FOLDER_NAME}")
return True
except Exception as e:
print(f"❌ Error setting up RAG folder: {e}")
return False
def setup_rag_file(self):
"""Create or find the RAG data file"""
try:
# Check if file already exists
results = self.service.files().list(
q=f"name='{RAG_FILE_NAME}' and parents in '{self.rag_folder_id}'",
fields="files(id, name)"
).execute()
files = results.get('files', [])
if files:
self.rag_file_id = files[0]['id']
print(f"✅ Found existing RAG file: {RAG_FILE_NAME}")
else:
# Create new file with empty data
initial_data = {
"metadata": {
"created_at": datetime.now().isoformat(),
"description": "High-confidence Vietnamese fake news for RAG",
"threshold": CONFIDENCE_THRESHOLD,
"total_entries": 0
},
"news_entries": []
}
file_metadata = {
'name': RAG_FILE_NAME,
'parents': [self.rag_folder_id]
}
media = MediaIoBaseUpload(
io.BytesIO(json.dumps(initial_data, ensure_ascii=False, indent=2).encode('utf-8')),
mimetype='application/json'
)
file = self.service.files().create(
body=file_metadata,
media_body=media,
fields='id'
).execute()
self.rag_file_id = file.get('id')
print(f"✅ Created new RAG file: {RAG_FILE_NAME}")
return True
except Exception as e:
print(f"❌ Error setting up RAG file: {e}")
return False
def load_rag_data(self):
"""Load existing RAG data from Google Drive"""
try:
if not self.rag_file_id:
return {"metadata": {"total_entries": 0}, "news_entries": []}
request = self.service.files().get_media(fileId=self.rag_file_id)
file_content = io.BytesIO()
downloader = MediaIoBaseDownload(file_content, request)
done = False
while done is False:
status, done = downloader.next_chunk()
file_content.seek(0)
data = json.loads(file_content.read().decode('utf-8'))
print(f"📚 Loaded {data.get('metadata', {}).get('total_entries', 0)} entries from RAG file")
return data
except Exception as e:
print(f"❌ Error loading RAG data: {e}")
return {"metadata": {"total_entries": 0}, "news_entries": []}
def save_rag_data(self, data):
"""Save RAG data to Google Drive"""
try:
if not self.rag_file_id:
return False
# Update metadata
data['metadata']['last_updated'] = datetime.now().isoformat()
data['metadata']['total_entries'] = len(data['news_entries'])
# Convert to JSON
json_data = json.dumps(data, ensure_ascii=False, indent=2)
media = MediaIoBaseUpload(
io.BytesIO(json_data.encode('utf-8')),
mimetype='application/json'
)
# Update the file
self.service.files().update(
fileId=self.rag_file_id,
media_body=media
).execute()
print(f"✅ Saved {len(data['news_entries'])} entries to RAG file")
return True
except Exception as e:
print(f"❌ Error saving RAG data: {e}")
return False
def add_high_confidence_news(self, news_text, gemini_analysis, gemini_confidence,
prediction, search_results=None, distilbert_confidence=None):
"""Add high-confidence news to RAG system"""
try:
# Check confidence threshold
if gemini_confidence < CONFIDENCE_THRESHOLD:
print(f"⚠️ Confidence {gemini_confidence:.1%} below threshold {CONFIDENCE_THRESHOLD:.1%}")
return False
# Create content hash for deduplication
content_hash = hashlib.md5(news_text.encode('utf-8')).hexdigest()
# Load existing data
data = self.load_rag_data()
# Check if entry already exists
for entry in data['news_entries']:
if entry.get('content_hash') == content_hash:
print(f"⚠️ News already exists in RAG (hash: {content_hash[:8]}...)")
return False
# Create new entry
new_entry = {
'id': len(data['news_entries']) + 1,
'content_hash': content_hash,
'news_text': news_text,
'prediction': prediction,
'gemini_confidence': gemini_confidence,
'gemini_analysis': gemini_analysis,
'distilbert_confidence': distilbert_confidence,
'search_results': search_results or [],
'created_at': datetime.now().isoformat(),
'source': 'user_input',
'verified': True # High confidence means verified
}
# Add to data
data['news_entries'].append(new_entry)
# Save to Google Drive
success = self.save_rag_data(data)
if success:
print(f"✅ Added high-confidence news to RAG:")
print(f" 📰 News: {news_text[:100]}...")
print(f" 🎯 Prediction: {prediction}")
print(f" 📊 Confidence: {gemini_confidence:.1%}")
print(f" 🔗 Hash: {content_hash[:8]}...")
return True
else:
return False
except Exception as e:
print(f"❌ Error adding news to RAG: {e}")
return False
def search_rag_news(self, query_text, limit=5):
"""Search RAG news for similar entries"""
try:
data = self.load_rag_data()
if not data['news_entries']:
return []
results = []
query_lower = query_text.lower()
for entry in data['news_entries']:
# Simple text similarity search
if (query_lower in entry.get('news_text', '').lower() or
query_lower in entry.get('gemini_analysis', '').lower()):
results.append({
'news_text': entry['news_text'],
'prediction': entry['prediction'],
'confidence': entry['gemini_confidence'],
'analysis': entry['gemini_analysis'],
'created_at': entry['created_at'],
'id': entry['id']
})
# Sort by confidence and creation date
results.sort(key=lambda x: (x['confidence'], x['created_at']), reverse=True)
results = results[:limit]
if results:
print(f"🔍 Found {len(results)} similar entries in RAG")
return results
except Exception as e:
print(f"❌ Error searching RAG news: {e}")
return []
def get_rag_statistics(self):
"""Get statistics about RAG data"""
try:
data = self.load_rag_data()
entries = data['news_entries']
if not entries:
return {
'total_entries': 0,
'real_count': 0,
'fake_count': 0,
'avg_confidence': 0,
'latest_entry': None,
'folder_id': self.rag_folder_id,
'file_id': self.rag_file_id
}
real_count = sum(1 for entry in entries if entry['prediction'] == 'REAL')
fake_count = sum(1 for entry in entries if entry['prediction'] == 'FAKE')
avg_confidence = sum(entry['gemini_confidence'] for entry in entries) / len(entries)
# Get latest entry
latest_entry = max(entries, key=lambda x: x['created_at']) if entries else None
stats = {
'total_entries': len(entries),
'real_count': real_count,
'fake_count': fake_count,
'avg_confidence': avg_confidence,
'latest_entry': latest_entry,
'folder_id': self.rag_folder_id,
'file_id': self.rag_file_id
}
return stats
except Exception as e:
print(f"❌ Error getting RAG statistics: {e}")
return None
def initialize(self):
"""Initialize the RAG system"""
print("🚀 Initializing RAG News Manager...")
if not self.authenticate():
return False
if not self.setup_rag_folder():
return False
if not self.setup_rag_file():
return False
print("✅ RAG News Manager initialized successfully!")
return True
# Global instance
rag_manager = RAGNewsManager()
def initialize_rag_system():
"""Initialize the RAG system"""
return rag_manager.initialize()
def add_news_to_rag(news_text, gemini_analysis, gemini_confidence, prediction,
search_results=None, distilbert_confidence=None):
"""Add news to RAG system if confidence is high enough"""
return rag_manager.add_high_confidence_news(
news_text, gemini_analysis, gemini_confidence, prediction,
search_results, distilbert_confidence
)
def search_rag_for_context(query_text, limit=3):
"""Search RAG for context to use in analysis"""
return rag_manager.search_rag_news(query_text, limit)
def get_rag_stats():
"""Get RAG system statistics"""
return rag_manager.get_rag_statistics()
if __name__ == "__main__":
# Test the RAG system
print("Testing RAG News Manager...")
if initialize_rag_system():
# Test adding a news entry
test_news = "Argentina vô địch World Cup 2022 là sự thật"
test_analysis = "1. KẾT LUẬN: THẬT\n2. ĐỘ TIN CẬY: THẬT: 98% / GIẢ: 2%"
test_confidence = 0.98
success = add_news_to_rag(
news_text=test_news,
gemini_analysis=test_analysis,
gemini_confidence=test_confidence,
prediction="REAL"
)
if success:
print("✅ Test news added successfully!")
# Get statistics
stats = get_rag_stats()
if stats:
print(f"📊 RAG Statistics:")
print(f" Total entries: {stats['total_entries']}")
print(f" Real news: {stats['real_count']}")
print(f" Fake news: {stats['fake_count']}")
print(f" Average confidence: {stats['avg_confidence']:.1%}")
print(f" Google Drive folder ID: {stats['folder_id']}")
print(f" Google Drive file ID: {stats['file_id']}")
else:
print("❌ Failed to add test news")
else:
print("❌ Failed to initialize RAG system")