""" Google Search Console API client for SEO Report Generator Handles OAuth authentication and Search Analytics API queries using Google API client """ import os import json from datetime import datetime, timedelta from typing import Dict, Any, List, Optional import time try: from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import Flow from googleapiclient.discovery import build GOOGLE_LIBS_AVAILABLE = True except ImportError: GOOGLE_LIBS_AVAILABLE = False # Create dummy classes to prevent import errors class Credentials: pass class Request: pass class Flow: @classmethod def from_client_config(cls, *args, **kwargs): pass def build(*args, **kwargs): pass from utils import safe_pct class GSCClient: def __init__(self): if not GOOGLE_LIBS_AVAILABLE: raise ImportError("Google API libraries not installed. Run: pip install google-api-python-client google-auth-oauthlib google-auth") self.client_id = os.getenv('GOOGLE_CLIENT_ID') self.client_secret = os.getenv('GOOGLE_CLIENT_SECRET') self.redirect_uri = os.getenv('GSC_REDIRECT_URI', 'http://localhost:7860/auth/gsc/callback') self.property_url = os.getenv('GSC_PROPERTY_URL') # Configuration self.row_limit = int(os.getenv('GSC_ROW_LIMIT', 1000)) self.days = int(os.getenv('GSC_DAYS', 28)) # OAuth2 scopes self.scopes = ['https://www.googleapis.com/auth/webmasters.readonly'] # Cache self.cache = {} self.cache_ttl = 3600 # 1 hour def get_auth_url(self, state: str = None) -> str: """Generate OAuth authorization URL using Google OAuth2 flow""" if not self.client_id or not self.client_secret: raise ValueError("GOOGLE_CLIENT_ID and GOOGLE_CLIENT_SECRET must be configured") # Create OAuth2 client configuration client_config = { "web": { "client_id": self.client_id, "client_secret": self.client_secret, "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", "redirect_uris": [self.redirect_uri] } } # Create the flow flow = Flow.from_client_config( client_config, scopes=self.scopes, redirect_uri=self.redirect_uri ) # Generate authorization URL auth_url, _ = flow.authorization_url( access_type='offline', include_granted_scopes='true', prompt='consent' ) return auth_url def exchange_code(self, auth_code: str) -> Dict[str, Any]: """Exchange authorization code for access token using Google OAuth2 flow""" # Create OAuth2 client configuration client_config = { "web": { "client_id": self.client_id, "client_secret": self.client_secret, "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", "redirect_uris": [self.redirect_uri] } } # Create the flow flow = Flow.from_client_config( client_config, scopes=self.scopes, redirect_uri=self.redirect_uri ) # Exchange code for token flow.fetch_token(code=auth_code) # Return credentials in a format compatible with session storage credentials = flow.credentials return { 'access_token': credentials.token, 'refresh_token': credentials.refresh_token, 'token_uri': credentials.token_uri, 'client_id': credentials.client_id, 'client_secret': credentials.client_secret, 'scopes': credentials.scopes } def get_credentials_from_session(self, session_data: Dict[str, Any]) -> Credentials: """Create Credentials object from session data""" return Credentials( token=session_data.get('access_token'), refresh_token=session_data.get('refresh_token'), token_uri=session_data.get('token_uri'), client_id=session_data.get('client_id'), client_secret=session_data.get('client_secret'), scopes=session_data.get('scopes') ) def get_search_analytics(self, session_data: Dict[str, Any], property_url: str = None) -> Dict[str, Any]: """Fetch search analytics data from GSC using Google API client""" if not property_url: property_url = self.property_url if not property_url: raise ValueError("GSC_PROPERTY_URL not configured") # Check cache cache_key = f"gsc_{property_url}_{self.days}" if cache_key in self.cache: cache_time, data = self.cache[cache_key] if time.time() - cache_time < self.cache_ttl: return data # Get credentials from session credentials = self.get_credentials_from_session(session_data) # Refresh token if needed if not credentials.valid: credentials.refresh(Request()) # Update session with new token session_data['access_token'] = credentials.token # Build the Search Console service service = build('searchconsole', 'v1', credentials=credentials) # Calculate date range end_date = datetime.now() - timedelta(days=3) # GSC has ~3 day delay start_date = end_date - timedelta(days=self.days) # Prepare the request body request_body = { 'startDate': start_date.strftime('%Y-%m-%d'), 'endDate': end_date.strftime('%Y-%m-%d'), 'dimensions': ['query'], 'searchType': 'web', 'rowLimit': self.row_limit } try: # Execute the search analytics query response = service.searchanalytics().query( siteUrl=property_url, body=request_body ).execute() # Cache the result self.cache[cache_key] = (time.time(), response) return response except Exception as e: raise Exception(f"GSC API request failed: {str(e)}") def transform_gsc_data(self, gsc_response: Dict[str, Any], domain: str) -> Dict[str, Any]: """Transform GSC API response into keywords module format""" rows = gsc_response.get('rows', []) if not rows: return { 'data_source': 'Google Search Console', 'totals': {'keywords': 0, 'estimated_traffic': 0}, 'distribution': {'top3': 0, 'top10': 0, 'top50': 0}, 'distribution_pct': {'top3': 0, 'top10': 0, 'top50': 0}, 'best_keywords': [], 'worst_keywords': {'by_ctr': [], 'by_position': []}, 'opportunities': [], 'competitor_summary': [] } # Transform rows keywords = [] for row in rows: keywords.append({ 'query': row['keys'][0], 'clicks': row['clicks'], 'impressions': row['impressions'], 'ctr': row['ctr'] * 100, # Convert to percentage 'avg_position': row['position'] }) # Calculate distribution (approximate based on avg_position) top3 = sum(1 for r in keywords if r['avg_position'] <= 3) top10 = sum(1 for r in keywords if r['avg_position'] <= 10) top50 = sum(1 for r in keywords if r['avg_position'] <= 50) total = len(keywords) # Best performers (sort by clicks, then CTR) best_keywords = sorted(keywords, key=lambda x: (x['clicks'], x['ctr']), reverse=True)[:15] # Transform best keywords to expected format best_keywords_formatted = [ { 'keyword': k['query'], 'rank': round(k['avg_position'], 1), 'url': '', # GSC doesn't provide URL per query 'volume': k['impressions'], 'estimated_traffic': k['clicks'], 'trend': 'stable', # No historical data in single request 'clicks': k['clicks'], 'ctr': k['ctr'] } for k in best_keywords ] # Worst performers worst_keywords = self._identify_worst_gsc_keywords(keywords) # Opportunities (high impressions, low CTR) opportunities = [ { 'keyword': k['query'], 'impressions': k['impressions'], 'ctr': k['ctr'], 'avg_position': k['avg_position'], 'clicks': k['clicks'], 'priority_score': self._calculate_gsc_opportunity_score(k) } for k in keywords if k['impressions'] >= 100 and k['ctr'] < 2.0 and k['avg_position'] > 10 ] opportunities.sort(key=lambda x: x['priority_score'], reverse=True) return { 'data_source': 'Google Search Console', 'totals': { 'keywords': total, 'estimated_traffic': sum(k['clicks'] for k in keywords) }, 'distribution': { 'top3': top3, 'top10': top10, 'top50': top50 }, 'distribution_pct': { 'top3': safe_pct(top3, total), 'top10': safe_pct(top10, total), 'top50': safe_pct(top50, total) }, 'best_keywords': best_keywords_formatted, 'worst_keywords': worst_keywords, 'opportunities': opportunities[:50], 'competitor_summary': [], # GSC doesn't provide competitor data 'movement': {'new': 0, 'up': 0, 'down': 0, 'lost': 0}, # Requires historical data 'data_sources': { 'positions': 'Google Search Console', 'volume': 'Google Search Console', 'enrichment_rate': 100.0 # GSC provides complete data } } def _identify_worst_gsc_keywords(self, keywords: List[Dict]) -> Dict[str, List[Dict]]: """Identify worst performing keywords from GSC data""" IMP_MIN = 100 CTR_MIN = 1.0 # Worst by CTR worst_by_ctr = [ { 'keyword': k['query'], 'rank': round(k['avg_position'], 1), 'impressions': k['impressions'], 'estimated_ctr': k['ctr'], 'clicks': k['clicks'] } for k in keywords if k['impressions'] >= IMP_MIN and k['ctr'] < CTR_MIN ] # Worst by position worst_by_position = [ { 'keyword': k['query'], 'rank': round(k['avg_position'], 1), 'impressions': k['impressions'], 'clicks': k['clicks'], 'ctr': k['ctr'] } for k in keywords if k['avg_position'] > 30 and k['impressions'] >= IMP_MIN ] # Sort and limit worst_by_ctr.sort(key=lambda x: x['estimated_ctr']) worst_by_position.sort(key=lambda x: x['rank'], reverse=True) return { 'by_ctr': worst_by_ctr[:20], 'by_position': worst_by_position[:20] } def _calculate_gsc_opportunity_score(self, keyword: Dict) -> float: """Calculate opportunity score for GSC keyword""" impressions = keyword['impressions'] ctr = keyword['ctr'] position = keyword['avg_position'] # Higher impressions = more opportunity impression_score = min(100, impressions / 1000 * 10) # Lower CTR = more opportunity for improvement ctr_score = max(0, 5 - ctr) * 10 # Closer to first page = more opportunity position_score = max(0, 50 - position) return round((impression_score + ctr_score + position_score) / 3, 1)