Spaces:
Running
Running
| """ | |
| Google Search Console API client for SEO Report Generator | |
| Handles OAuth authentication and Search Analytics API queries using Google API client | |
| """ | |
| import os | |
| import json | |
| from datetime import datetime, timedelta | |
| from typing import Dict, Any, List, Optional | |
| import time | |
| try: | |
| from google.auth.transport.requests import Request | |
| from google.oauth2.credentials import Credentials | |
| from google_auth_oauthlib.flow import Flow | |
| from googleapiclient.discovery import build | |
| GOOGLE_LIBS_AVAILABLE = True | |
| except ImportError: | |
| GOOGLE_LIBS_AVAILABLE = False | |
| # Create dummy classes to prevent import errors | |
| class Credentials: | |
| pass | |
| class Request: | |
| pass | |
| class Flow: | |
| def from_client_config(cls, *args, **kwargs): | |
| pass | |
| def build(*args, **kwargs): | |
| pass | |
| from utils import safe_pct | |
| class GSCClient: | |
| def __init__(self): | |
| if not GOOGLE_LIBS_AVAILABLE: | |
| raise ImportError("Google API libraries not installed. Run: pip install google-api-python-client google-auth-oauthlib google-auth") | |
| self.client_id = os.getenv('GOOGLE_CLIENT_ID') | |
| self.client_secret = os.getenv('GOOGLE_CLIENT_SECRET') | |
| self.redirect_uri = os.getenv('GSC_REDIRECT_URI', 'http://localhost:7860/auth/gsc/callback') | |
| self.property_url = os.getenv('GSC_PROPERTY_URL') | |
| # Configuration | |
| self.row_limit = int(os.getenv('GSC_ROW_LIMIT', 1000)) | |
| self.days = int(os.getenv('GSC_DAYS', 28)) | |
| # OAuth2 scopes | |
| self.scopes = ['https://www.googleapis.com/auth/webmasters.readonly'] | |
| # Cache | |
| self.cache = {} | |
| self.cache_ttl = 3600 # 1 hour | |
| def get_auth_url(self, state: str = None) -> str: | |
| """Generate OAuth authorization URL using Google OAuth2 flow""" | |
| if not self.client_id or not self.client_secret: | |
| raise ValueError("GOOGLE_CLIENT_ID and GOOGLE_CLIENT_SECRET must be configured") | |
| # Create OAuth2 client configuration | |
| client_config = { | |
| "web": { | |
| "client_id": self.client_id, | |
| "client_secret": self.client_secret, | |
| "auth_uri": "https://accounts.google.com/o/oauth2/auth", | |
| "token_uri": "https://oauth2.googleapis.com/token", | |
| "redirect_uris": [self.redirect_uri] | |
| } | |
| } | |
| # Create the flow | |
| flow = Flow.from_client_config( | |
| client_config, | |
| scopes=self.scopes, | |
| redirect_uri=self.redirect_uri | |
| ) | |
| # Generate authorization URL | |
| auth_url, _ = flow.authorization_url( | |
| access_type='offline', | |
| include_granted_scopes='true', | |
| prompt='consent' | |
| ) | |
| return auth_url | |
| def exchange_code(self, auth_code: str) -> Dict[str, Any]: | |
| """Exchange authorization code for access token using Google OAuth2 flow""" | |
| # Create OAuth2 client configuration | |
| client_config = { | |
| "web": { | |
| "client_id": self.client_id, | |
| "client_secret": self.client_secret, | |
| "auth_uri": "https://accounts.google.com/o/oauth2/auth", | |
| "token_uri": "https://oauth2.googleapis.com/token", | |
| "redirect_uris": [self.redirect_uri] | |
| } | |
| } | |
| # Create the flow | |
| flow = Flow.from_client_config( | |
| client_config, | |
| scopes=self.scopes, | |
| redirect_uri=self.redirect_uri | |
| ) | |
| # Exchange code for token | |
| flow.fetch_token(code=auth_code) | |
| # Return credentials in a format compatible with session storage | |
| credentials = flow.credentials | |
| return { | |
| 'access_token': credentials.token, | |
| 'refresh_token': credentials.refresh_token, | |
| 'token_uri': credentials.token_uri, | |
| 'client_id': credentials.client_id, | |
| 'client_secret': credentials.client_secret, | |
| 'scopes': credentials.scopes | |
| } | |
| def get_credentials_from_session(self, session_data: Dict[str, Any]) -> Credentials: | |
| """Create Credentials object from session data""" | |
| return Credentials( | |
| token=session_data.get('access_token'), | |
| refresh_token=session_data.get('refresh_token'), | |
| token_uri=session_data.get('token_uri'), | |
| client_id=session_data.get('client_id'), | |
| client_secret=session_data.get('client_secret'), | |
| scopes=session_data.get('scopes') | |
| ) | |
| def get_search_analytics(self, session_data: Dict[str, Any], property_url: str = None) -> Dict[str, Any]: | |
| """Fetch search analytics data from GSC using Google API client""" | |
| if not property_url: | |
| property_url = self.property_url | |
| if not property_url: | |
| raise ValueError("GSC_PROPERTY_URL not configured") | |
| # Check cache | |
| cache_key = f"gsc_{property_url}_{self.days}" | |
| if cache_key in self.cache: | |
| cache_time, data = self.cache[cache_key] | |
| if time.time() - cache_time < self.cache_ttl: | |
| return data | |
| # Get credentials from session | |
| credentials = self.get_credentials_from_session(session_data) | |
| # Refresh token if needed | |
| if not credentials.valid: | |
| credentials.refresh(Request()) | |
| # Update session with new token | |
| session_data['access_token'] = credentials.token | |
| # Build the Search Console service | |
| service = build('searchconsole', 'v1', credentials=credentials) | |
| # Calculate date range | |
| end_date = datetime.now() - timedelta(days=3) # GSC has ~3 day delay | |
| start_date = end_date - timedelta(days=self.days) | |
| # Prepare the request body | |
| request_body = { | |
| 'startDate': start_date.strftime('%Y-%m-%d'), | |
| 'endDate': end_date.strftime('%Y-%m-%d'), | |
| 'dimensions': ['query'], | |
| 'searchType': 'web', | |
| 'rowLimit': self.row_limit | |
| } | |
| try: | |
| # Execute the search analytics query | |
| response = service.searchanalytics().query( | |
| siteUrl=property_url, | |
| body=request_body | |
| ).execute() | |
| # Cache the result | |
| self.cache[cache_key] = (time.time(), response) | |
| return response | |
| except Exception as e: | |
| raise Exception(f"GSC API request failed: {str(e)}") | |
| def transform_gsc_data(self, gsc_response: Dict[str, Any], domain: str) -> Dict[str, Any]: | |
| """Transform GSC API response into keywords module format""" | |
| rows = gsc_response.get('rows', []) | |
| if not rows: | |
| return { | |
| 'data_source': 'Google Search Console', | |
| 'totals': {'keywords': 0, 'estimated_traffic': 0}, | |
| 'distribution': {'top3': 0, 'top10': 0, 'top50': 0}, | |
| 'distribution_pct': {'top3': 0, 'top10': 0, 'top50': 0}, | |
| 'best_keywords': [], | |
| 'worst_keywords': {'by_ctr': [], 'by_position': []}, | |
| 'opportunities': [], | |
| 'competitor_summary': [] | |
| } | |
| # Transform rows | |
| keywords = [] | |
| for row in rows: | |
| keywords.append({ | |
| 'query': row['keys'][0], | |
| 'clicks': row['clicks'], | |
| 'impressions': row['impressions'], | |
| 'ctr': row['ctr'] * 100, # Convert to percentage | |
| 'avg_position': row['position'] | |
| }) | |
| # Calculate distribution (approximate based on avg_position) | |
| top3 = sum(1 for r in keywords if r['avg_position'] <= 3) | |
| top10 = sum(1 for r in keywords if r['avg_position'] <= 10) | |
| top50 = sum(1 for r in keywords if r['avg_position'] <= 50) | |
| total = len(keywords) | |
| # Best performers (sort by clicks, then CTR) | |
| best_keywords = sorted(keywords, key=lambda x: (x['clicks'], x['ctr']), reverse=True)[:15] | |
| # Transform best keywords to expected format | |
| best_keywords_formatted = [ | |
| { | |
| 'keyword': k['query'], | |
| 'rank': round(k['avg_position'], 1), | |
| 'url': '', # GSC doesn't provide URL per query | |
| 'volume': k['impressions'], | |
| 'estimated_traffic': k['clicks'], | |
| 'trend': 'stable', # No historical data in single request | |
| 'clicks': k['clicks'], | |
| 'ctr': k['ctr'] | |
| } | |
| for k in best_keywords | |
| ] | |
| # Worst performers | |
| worst_keywords = self._identify_worst_gsc_keywords(keywords) | |
| # Opportunities (high impressions, low CTR) | |
| opportunities = [ | |
| { | |
| 'keyword': k['query'], | |
| 'impressions': k['impressions'], | |
| 'ctr': k['ctr'], | |
| 'avg_position': k['avg_position'], | |
| 'clicks': k['clicks'], | |
| 'priority_score': self._calculate_gsc_opportunity_score(k) | |
| } | |
| for k in keywords | |
| if k['impressions'] >= 100 and k['ctr'] < 2.0 and k['avg_position'] > 10 | |
| ] | |
| opportunities.sort(key=lambda x: x['priority_score'], reverse=True) | |
| return { | |
| 'data_source': 'Google Search Console', | |
| 'totals': { | |
| 'keywords': total, | |
| 'estimated_traffic': sum(k['clicks'] for k in keywords) | |
| }, | |
| 'distribution': { | |
| 'top3': top3, | |
| 'top10': top10, | |
| 'top50': top50 | |
| }, | |
| 'distribution_pct': { | |
| 'top3': safe_pct(top3, total), | |
| 'top10': safe_pct(top10, total), | |
| 'top50': safe_pct(top50, total) | |
| }, | |
| 'best_keywords': best_keywords_formatted, | |
| 'worst_keywords': worst_keywords, | |
| 'opportunities': opportunities[:50], | |
| 'competitor_summary': [], # GSC doesn't provide competitor data | |
| 'movement': {'new': 0, 'up': 0, 'down': 0, 'lost': 0}, # Requires historical data | |
| 'data_sources': { | |
| 'positions': 'Google Search Console', | |
| 'volume': 'Google Search Console', | |
| 'enrichment_rate': 100.0 # GSC provides complete data | |
| } | |
| } | |
| def _identify_worst_gsc_keywords(self, keywords: List[Dict]) -> Dict[str, List[Dict]]: | |
| """Identify worst performing keywords from GSC data""" | |
| IMP_MIN = 100 | |
| CTR_MIN = 1.0 | |
| # Worst by CTR | |
| worst_by_ctr = [ | |
| { | |
| 'keyword': k['query'], | |
| 'rank': round(k['avg_position'], 1), | |
| 'impressions': k['impressions'], | |
| 'estimated_ctr': k['ctr'], | |
| 'clicks': k['clicks'] | |
| } | |
| for k in keywords | |
| if k['impressions'] >= IMP_MIN and k['ctr'] < CTR_MIN | |
| ] | |
| # Worst by position | |
| worst_by_position = [ | |
| { | |
| 'keyword': k['query'], | |
| 'rank': round(k['avg_position'], 1), | |
| 'impressions': k['impressions'], | |
| 'clicks': k['clicks'], | |
| 'ctr': k['ctr'] | |
| } | |
| for k in keywords | |
| if k['avg_position'] > 30 and k['impressions'] >= IMP_MIN | |
| ] | |
| # Sort and limit | |
| worst_by_ctr.sort(key=lambda x: x['estimated_ctr']) | |
| worst_by_position.sort(key=lambda x: x['rank'], reverse=True) | |
| return { | |
| 'by_ctr': worst_by_ctr[:20], | |
| 'by_position': worst_by_position[:20] | |
| } | |
| def _calculate_gsc_opportunity_score(self, keyword: Dict) -> float: | |
| """Calculate opportunity score for GSC keyword""" | |
| impressions = keyword['impressions'] | |
| ctr = keyword['ctr'] | |
| position = keyword['avg_position'] | |
| # Higher impressions = more opportunity | |
| impression_score = min(100, impressions / 1000 * 10) | |
| # Lower CTR = more opportunity for improvement | |
| ctr_score = max(0, 5 - ctr) * 10 | |
| # Closer to first page = more opportunity | |
| position_score = max(0, 50 - position) | |
| return round((impression_score + ctr_score + position_score) / 3, 1) |