Spaces:
Running
Running
| import requests | |
| from bs4 import BeautifulSoup | |
| from urllib.parse import urljoin, urlparse, parse_qs | |
| import re | |
| from datetime import datetime, timedelta | |
| from typing import Dict, Any, List, Set | |
| import xml.etree.ElementTree as ET | |
| from utils import safe_pct | |
| class ContentAuditModule: | |
| def __init__(self): | |
| self.session = requests.Session() | |
| self.session.headers.update({ | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
| }) | |
| # CTA keywords to look for | |
| self.cta_keywords = [ | |
| 'contact', 'download', 'subscribe', 'buy', 'purchase', 'order', | |
| 'register', 'sign up', 'get started', 'learn more', 'book now', | |
| 'free trial', 'demo', 'consultation', 'quote', 'call now' | |
| ] | |
| def analyze(self, url: str, quick_scan: bool = False) -> Dict[str, Any]: | |
| """ | |
| Perform content audit for a given URL | |
| Args: | |
| url: Website URL to analyze | |
| quick_scan: If True, perform limited analysis (for competitors) | |
| Returns: | |
| Dictionary containing content audit metrics | |
| """ | |
| try: | |
| # Normalize URL | |
| if not url.startswith(('http://', 'https://')): | |
| url = 'https://' + url | |
| # Get sitemap URLs | |
| sitemap_urls = self._get_sitemap_urls(url, limit=200 if not quick_scan else 50) | |
| # If no sitemap, crawl from homepage | |
| if not sitemap_urls: | |
| sitemap_urls = self._crawl_from_homepage(url, limit=50 if not quick_scan else 20) | |
| # Analyze pages | |
| pages_analyzed = [] | |
| for page_url in sitemap_urls[:200 if not quick_scan else 20]: | |
| page_data = self._analyze_page(page_url) | |
| if page_data: | |
| pages_analyzed.append(page_data) | |
| # Calculate aggregate metrics | |
| result = self._calculate_metrics(url, pages_analyzed, quick_scan) | |
| return result | |
| except Exception as e: | |
| return self._get_fallback_data(url, str(e)) | |
| def _get_sitemap_urls(self, base_url: str, limit: int = 200) -> List[str]: | |
| urls = [] | |
| # Common sitemap locations | |
| sitemap_locations = [ | |
| f"{base_url}/sitemap.xml", | |
| f"{base_url}/sitemap_index.xml", | |
| f"{base_url}/sitemaps/sitemap.xml" | |
| ] | |
| for sitemap_url in sitemap_locations: | |
| try: | |
| response = self.session.get(sitemap_url, timeout=10) | |
| if response.status_code == 200: | |
| urls.extend(self._parse_sitemap(response.content, base_url, limit)) | |
| break | |
| except: | |
| continue | |
| return urls[:limit] | |
| def _parse_sitemap(self, sitemap_content: bytes, base_url: str, limit: int) -> List[str]: | |
| urls = [] | |
| try: | |
| root = ET.fromstring(sitemap_content) | |
| # Handle sitemap index | |
| for sitemap_elem in root.findall('.//{http://www.sitemaps.org/schemas/sitemap/0.9}sitemap'): | |
| loc_elem = sitemap_elem.find('{http://www.sitemaps.org/schemas/sitemap/0.9}loc') | |
| if loc_elem is not None and len(urls) < limit: | |
| # Recursively parse sub-sitemaps | |
| try: | |
| response = self.session.get(loc_elem.text, timeout=10) | |
| if response.status_code == 200: | |
| sub_urls = self._parse_sitemap(response.content, base_url, limit - len(urls)) | |
| urls.extend(sub_urls) | |
| except: | |
| continue | |
| # Handle direct URL entries | |
| for url_elem in root.findall('.//{http://www.sitemaps.org/schemas/sitemap/0.9}url'): | |
| if len(urls) >= limit: | |
| break | |
| loc_elem = url_elem.find('{http://www.sitemaps.org/schemas/sitemap/0.9}loc') | |
| if loc_elem is not None: | |
| url = loc_elem.text | |
| if self._is_valid_content_url(url): | |
| urls.append(url) | |
| except ET.ParseError: | |
| pass | |
| return urls[:limit] | |
| def _crawl_from_homepage(self, base_url: str, limit: int = 50) -> List[str]: | |
| urls = set([base_url]) | |
| processed = set() | |
| try: | |
| response = self.session.get(base_url, timeout=10) | |
| if response.status_code == 200: | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| # Find all internal links | |
| for link in soup.find_all('a', href=True): | |
| if len(urls) >= limit: | |
| break | |
| href = link['href'] | |
| full_url = urljoin(base_url, href) | |
| if self._is_same_domain(full_url, base_url) and self._is_valid_content_url(full_url): | |
| urls.add(full_url) | |
| except: | |
| pass | |
| return list(urls)[:limit] | |
| def _analyze_page(self, url: str) -> Dict[str, Any]: | |
| try: | |
| response = self.session.get(url, timeout=15) | |
| if response.status_code != 200: | |
| return None | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| # Extract metadata | |
| title = soup.find('title') | |
| title_text = title.text.strip() if title else "" | |
| meta_description = soup.find('meta', attrs={'name': 'description'}) | |
| description_text = meta_description.get('content', '').strip() if meta_description else "" | |
| # H1 tags | |
| h1_tags = soup.find_all('h1') | |
| h1_text = [h1.text.strip() for h1 in h1_tags] | |
| # Word count (main content) | |
| content_text = self._extract_main_content(soup) | |
| word_count = len(content_text.split()) if content_text else 0 | |
| # CTA presence | |
| has_cta = self._detect_cta(soup) | |
| # Last modified (if available) | |
| last_modified = self._get_last_modified(response.headers, soup) | |
| # hreflang detection | |
| hreflang_data = self._detect_hreflang(soup) | |
| return { | |
| 'url': url, | |
| 'title': title_text, | |
| 'title_length': len(title_text), | |
| 'meta_description': description_text, | |
| 'description_length': len(description_text), | |
| 'h1_tags': h1_text, | |
| 'h1_count': len(h1_text), | |
| 'word_count': word_count, | |
| 'has_cta': has_cta, | |
| 'last_modified': last_modified, | |
| 'hreflang_data': hreflang_data, | |
| 'status_code': response.status_code | |
| } | |
| except Exception as e: | |
| return { | |
| 'url': url, | |
| 'error': str(e), | |
| 'status_code': 0 | |
| } | |
| def _extract_main_content(self, soup: BeautifulSoup) -> str: | |
| """Extract main content text from HTML""" | |
| # Remove script and style elements | |
| for script in soup(["script", "style", "nav", "header", "footer"]): | |
| script.decompose() | |
| # Try to find main content areas | |
| main_content = soup.find('main') or soup.find('article') or soup.find('div', class_=re.compile(r'content|main|body')) | |
| if main_content: | |
| return main_content.get_text() | |
| else: | |
| return soup.get_text() | |
| def _detect_cta(self, soup: BeautifulSoup) -> bool: | |
| text_content = soup.get_text().lower() | |
| for keyword in self.cta_keywords: | |
| if keyword in text_content: | |
| return True | |
| # Check for buttons and links with CTA-like text | |
| for element in soup.find_all(['button', 'a']): | |
| element_text = element.get_text().lower() | |
| for keyword in self.cta_keywords: | |
| if keyword in element_text: | |
| return True | |
| return False | |
| def _get_last_modified(self, headers: Dict, soup: BeautifulSoup) -> str: | |
| # Check headers first | |
| if 'last-modified' in headers: | |
| return headers['last-modified'] | |
| # Check meta tags | |
| meta_modified = soup.find('meta', attrs={'name': 'last-modified'}) or \ | |
| soup.find('meta', attrs={'property': 'article:modified_time'}) | |
| if meta_modified: | |
| return meta_modified.get('content', '') | |
| return "" | |
| def _detect_hreflang(self, soup: BeautifulSoup) -> Dict[str, Any]: | |
| """Detect hreflang implementation on a page""" | |
| links = soup.find_all("link", rel="alternate") | |
| hreflangs = [] | |
| for link in links: | |
| hreflang = link.get("hreflang") | |
| if hreflang: | |
| hreflangs.append({ | |
| 'hreflang': hreflang, | |
| 'href': link.get('href', '') | |
| }) | |
| has_x_default = any(h['hreflang'] == 'x-default' for h in hreflangs) | |
| return { | |
| 'has_hreflang': len(hreflangs) > 0, | |
| 'tags': hreflangs, | |
| 'count': len(hreflangs), | |
| 'has_x_default': has_x_default | |
| } | |
| def _extract_stale_pages(self, pages_data: List[Dict]) -> List[Dict[str, Any]]: | |
| """Extract pages that are 18+ months old""" | |
| eighteen_months_ago = datetime.now() - timedelta(days=540) | |
| stale_pages = [] | |
| for page in pages_data: | |
| last_modified = page.get('last_modified', '') | |
| if not last_modified: | |
| continue | |
| try: | |
| # Parse various date formats | |
| if 'GMT' in last_modified: | |
| modified_date = datetime.strptime(last_modified, '%a, %d %b %Y %H:%M:%S GMT') | |
| else: | |
| # Try ISO format | |
| modified_date = datetime.fromisoformat(last_modified.replace('Z', '+00:00')) | |
| if modified_date <= eighteen_months_ago: | |
| stale_pages.append({ | |
| 'url': page.get('url', ''), | |
| 'last_modified': last_modified | |
| }) | |
| except: | |
| continue | |
| # Sort by oldest first and limit to 200 | |
| stale_pages.sort(key=lambda x: x['last_modified']) | |
| return stale_pages[:200] | |
| def _analyze_hreflang(self, pages_data: List[Dict]) -> Dict[str, Any]: | |
| """Analyze hreflang implementation across the site""" | |
| pages_with_hreflang = 0 | |
| sample_pages = [] | |
| for page in pages_data: | |
| hreflang_data = page.get('hreflang_data', {}) | |
| if hreflang_data.get('has_hreflang', False): | |
| pages_with_hreflang += 1 | |
| # Collect samples (up to 5) | |
| if len(sample_pages) < 5: | |
| sample_pages.append({ | |
| 'url': page.get('url', ''), | |
| 'tags': [tag['hreflang'] for tag in hreflang_data.get('tags', [])] | |
| }) | |
| total_pages = len(pages_data) | |
| site_pct = safe_pct(pages_with_hreflang, total_pages) | |
| return { | |
| 'site_pct': site_pct, | |
| 'samples': sample_pages, | |
| 'pages_with_hreflang': pages_with_hreflang, | |
| 'total_pages_checked': total_pages | |
| } | |
| def _is_valid_content_url(self, url: str) -> bool: | |
| if not url: | |
| return False | |
| # Skip non-content URLs | |
| skip_extensions = ['.pdf', '.jpg', '.png', '.gif', '.css', '.js', '.xml'] | |
| skip_paths = ['/wp-admin/', '/admin/', '/api/', '/feed/'] | |
| url_lower = url.lower() | |
| for ext in skip_extensions: | |
| if url_lower.endswith(ext): | |
| return False | |
| for path in skip_paths: | |
| if path in url_lower: | |
| return False | |
| return True | |
| def _is_same_domain(self, url1: str, url2: str) -> bool: | |
| try: | |
| domain1 = urlparse(url1).netloc | |
| domain2 = urlparse(url2).netloc | |
| return domain1 == domain2 | |
| except: | |
| return False | |
| def _calculate_metrics(self, base_url: str, pages_data: List[Dict], quick_scan: bool) -> Dict[str, Any]: | |
| total_pages = len(pages_data) | |
| valid_pages = [p for p in pages_data if 'error' not in p] | |
| if not valid_pages: | |
| return self._get_fallback_data(base_url, "No valid pages found") | |
| # Title metrics | |
| pages_with_title = len([p for p in valid_pages if p.get('title')]) | |
| avg_title_length = sum(p.get('title_length', 0) for p in valid_pages) / len(valid_pages) | |
| # Meta description metrics | |
| pages_with_description = len([p for p in valid_pages if p.get('meta_description')]) | |
| avg_description_length = sum(p.get('description_length', 0) for p in valid_pages) / len(valid_pages) | |
| # H1 metrics | |
| pages_with_h1 = len([p for p in valid_pages if p.get('h1_count', 0) > 0]) | |
| # Word count metrics | |
| word_counts = [p.get('word_count', 0) for p in valid_pages if p.get('word_count', 0) > 0] | |
| avg_word_count = sum(word_counts) / len(word_counts) if word_counts else 0 | |
| # CTA metrics | |
| pages_with_cta = len([p for p in valid_pages if p.get('has_cta')]) | |
| # Content freshness | |
| freshness_data = self._analyze_content_freshness(valid_pages) | |
| # Extract stale pages (18+ months old) | |
| stale_pages = self._extract_stale_pages(valid_pages) | |
| # hreflang analysis | |
| hreflang_analysis = self._analyze_hreflang(valid_pages) | |
| # Calculate metadata completeness percentage | |
| meta_complete_pct = safe_pct(pages_with_title + pages_with_description + pages_with_h1, len(valid_pages) * 3) | |
| return { | |
| 'url': base_url, | |
| 'total_pages_discovered': total_pages, | |
| 'pages_analyzed': len(valid_pages), | |
| 'meta_complete_pct': meta_complete_pct, | |
| 'avg_words': round(avg_word_count, 0), | |
| 'metadata_completeness': { | |
| 'title_coverage': safe_pct(pages_with_title, len(valid_pages)), | |
| 'description_coverage': safe_pct(pages_with_description, len(valid_pages)), | |
| 'h1_coverage': safe_pct(pages_with_h1, len(valid_pages)), | |
| 'avg_title_length': round(avg_title_length, 1), | |
| 'avg_description_length': round(avg_description_length, 1) | |
| }, | |
| 'content_metrics': { | |
| 'avg_word_count': round(avg_word_count, 0), | |
| 'cta_coverage': safe_pct(pages_with_cta, len(valid_pages)) | |
| }, | |
| 'content_freshness': freshness_data, | |
| 'stale_pages': stale_pages, | |
| 'hreflang': hreflang_analysis, | |
| 'data_source': 'Site crawl', | |
| 'quick_scan': quick_scan | |
| } | |
| def _analyze_content_freshness(self, pages_data: List[Dict]) -> Dict[str, Any]: | |
| now = datetime.now() | |
| six_months_ago = now - timedelta(days=180) | |
| eighteen_months_ago = now - timedelta(days=540) | |
| fresh_count = 0 | |
| moderate_count = 0 | |
| stale_count = 0 | |
| unknown_count = 0 | |
| for page in pages_data: | |
| last_modified = page.get('last_modified', '') | |
| if not last_modified: | |
| unknown_count += 1 | |
| continue | |
| try: | |
| # Parse various date formats | |
| if 'GMT' in last_modified: | |
| modified_date = datetime.strptime(last_modified, '%a, %d %b %Y %H:%M:%S GMT') | |
| else: | |
| # Try ISO format | |
| modified_date = datetime.fromisoformat(last_modified.replace('Z', '+00:00')) | |
| if modified_date >= six_months_ago: | |
| fresh_count += 1 | |
| elif modified_date >= eighteen_months_ago: | |
| moderate_count += 1 | |
| else: | |
| stale_count += 1 | |
| except: | |
| unknown_count += 1 | |
| total = len(pages_data) | |
| return { | |
| 'fresh_content': {'count': fresh_count, 'percentage': safe_pct(fresh_count, total)}, | |
| 'moderate_content': {'count': moderate_count, 'percentage': safe_pct(moderate_count, total)}, | |
| 'stale_content': {'count': stale_count, 'percentage': safe_pct(stale_count, total)}, | |
| 'unknown_date': {'count': unknown_count, 'percentage': safe_pct(unknown_count, total)} | |
| } | |
| def _get_fallback_data(self, url: str, error: str) -> Dict[str, Any]: | |
| return { | |
| 'url': url, | |
| 'error': f"Content audit failed: {error}", | |
| 'total_pages_discovered': 0, | |
| 'pages_analyzed': 0, | |
| 'metadata_completeness': { | |
| 'title_coverage': 0, | |
| 'description_coverage': 0, | |
| 'h1_coverage': 0, | |
| 'avg_title_length': 0, | |
| 'avg_description_length': 0 | |
| }, | |
| 'content_metrics': { | |
| 'avg_word_count': 0, | |
| 'cta_coverage': 0 | |
| }, | |
| 'content_freshness': { | |
| 'fresh_content': {'count': 0, 'percentage': 0}, | |
| 'moderate_content': {'count': 0, 'percentage': 0}, | |
| 'stale_content': {'count': 0, 'percentage': 0}, | |
| 'unknown_date': {'count': 0, 'percentage': 0} | |
| }, | |
| 'stale_pages': [], | |
| 'hreflang': {'site_pct': 0, 'samples': []}, | |
| 'data_source': 'Site crawl', | |
| 'meta_complete_pct': 0, | |
| 'avg_words': 0, | |
| 'quick_scan': False | |
| } |