Spaces:
Running
Running
| import json | |
| import os | |
| import re | |
| import time | |
| import logging | |
| import mimetypes | |
| import zipfile | |
| import tempfile | |
| import chardet | |
| import io | |
| import csv | |
| import xml.etree.ElementTree as ET | |
| from datetime import datetime | |
| from typing import List, Dict, Optional, Union, Tuple, Any | |
| from pathlib import Path | |
| from urllib.parse import urlparse, urljoin | |
| import requests | |
| import validators | |
| import gradio as gr | |
| from diskcache import Cache | |
| from bs4 import BeautifulSoup | |
| from fake_useragent import UserAgent | |
| from cleantext import clean | |
| import qrcode | |
| from PIL import Image, ImageDraw, ImageFont | |
| import numpy as np | |
| import tarfile | |
| import gzip | |
| import math | |
| import random | |
| import pandas as pd | |
| from requests.adapters import HTTPAdapter | |
| from urllib3.util.retry import Retry | |
| # Setup enhanced logging with more detailed formatting | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s', | |
| handlers=[ | |
| logging.StreamHandler(), | |
| logging.FileHandler('app.log', encoding='utf-8') | |
| ]) | |
| logger = logging.getLogger(__name__) | |
| # Conditional imports for document processing | |
| try: | |
| from PyPDF2 import PdfReader | |
| PDF_SUPPORT = True | |
| except ImportError: | |
| PDF_SUPPORT = False | |
| logger.warning("PyPDF2 not installed. PDF file processing will be limited.") | |
| try: | |
| from docx import Document | |
| DOCX_SUPPORT = True | |
| except ImportError: | |
| DOCX_SUPPORT = False | |
| logger.warning("python-docx not installed. DOCX file processing will be limited.") | |
| try: | |
| from pyth.plugins.plaintext.writer import PlaintextWriter | |
| from pyth.plugins.rtf15.reader import Rtf15Reader | |
| RTF_SUPPORT = True | |
| except ImportError: | |
| RTF_SUPPORT = False | |
| logger.warning("pyth not installed. RTF file processing will be limited.") | |
| try: | |
| from odf.opendocument import OpenDocumentText | |
| from odf import text as odftext | |
| ODT_SUPPORT = True | |
| except ImportError: | |
| ODT_SUPPORT = False | |
| logger.warning("odfpy not installed. ODT file processing will be limited.") | |
| # Ensure output directories exist with modern structure | |
| OUTPUTS_DIR = Path('output') | |
| QR_CODES_DIR = OUTPUTS_DIR / 'qr_codes' | |
| TEMP_DIR = OUTPUTS_DIR / 'temp' | |
| for directory in [OUTPUTS_DIR, QR_CODES_DIR, TEMP_DIR]: | |
| directory.mkdir(parents=True, exist_ok=True) | |
| class EnhancedURLProcessor: | |
| """Advanced URL processing with enhanced content extraction and recursive link following.""" | |
| def __init__(self): | |
| # Use a real requests session with retry strategy | |
| self.session = requests.Session() | |
| retry_strategy = Retry( | |
| total=3, | |
| backoff_factor=1, | |
| status_forcelist=[429, 500, 502, 503, 504], | |
| allowed_methods=["HEAD", "GET"] | |
| ) | |
| adapter = HTTPAdapter(max_retries=retry_strategy) | |
| self.session.mount("http://", adapter) | |
| self.session.mount("https://", adapter) | |
| self.user_agent = UserAgent() | |
| self.timeout = 15 # seconds | |
| def validate_url(self, url: str) -> Dict[str, Any]: | |
| """Enhanced URL validation with accessibility check.""" | |
| if not validators.url(url): | |
| return {'is_valid': False, 'message': 'Invalid URL format', | |
| 'details': 'URL must begin with http:// or https://'} | |
| parsed = urlparse(url) | |
| if not all([parsed.scheme, parsed.netloc]): | |
| return {'is_valid': False, 'message': 'Incomplete URL', 'details': 'Missing scheme or domain'} | |
| try: | |
| # Use a HEAD request to check accessibility without downloading full content | |
| headers = {'User-Agent': self.user_agent.random} | |
| response = self.session.head(url, timeout=self.timeout, headers=headers, allow_redirects=True) | |
| response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) | |
| # Check content type if available in HEAD response | |
| content_type = response.headers.get('Content-Type', '').split(';')[0].strip() | |
| if not content_type or not ( | |
| content_type.startswith('text/') or 'json' in content_type or 'xml' in content_type): | |
| # Basic check if content type seems relevant for text extraction | |
| logger.warning(f"URL {url} returned potentially irrelevant content type: {content_type}") | |
| # Decide if this should invalidate the URL or just add a note | |
| # For now, we'll allow fetching but add a note. | |
| return { | |
| 'is_valid': True, | |
| 'message': 'URL is valid and accessible', | |
| 'details': { | |
| 'final_url': response.url, # Capture final URL after redirects | |
| 'content_type': content_type, | |
| 'server': response.headers.get('Server', 'N/A'), | |
| 'size': response.headers.get('Content-Length', 'N/A') | |
| } | |
| } | |
| except requests.exceptions.RequestException as e: | |
| return {'is_valid': False, 'message': 'URL not accessible', 'details': str(e)} | |
| except Exception as e: | |
| logger.error(f"Unexpected error during URL validation for {url}: {e}") | |
| return {'is_valid': False, 'message': 'Unexpected validation error', 'details': str(e)} | |
| def fetch_content(self, url: str, retry_count: int = 0) -> Optional[Dict[str, Any]]: | |
| """Enhanced content fetcher with retry mechanism and complete character extraction.""" | |
| try: | |
| logger.info(f"Fetching content from URL: {url} (Attempt {retry_count + 1})") | |
| headers = {'User-Agent': self.user_agent.random} | |
| response = self.session.get(url, timeout=self.timeout, headers=headers, allow_redirects=True) | |
| response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) | |
| final_url = response.url # Capture potential redirects | |
| content_type = response.headers.get('Content-Type', '').split(';')[0].strip() | |
| # Attempt to detect encoding if not specified in headers | |
| encoding = response.encoding # requests attempts to guess encoding | |
| if encoding is None or encoding == 'ISO-8859-1': # Fallback if requests guess is default/uncertain | |
| try: | |
| encoding_detection = chardet.detect(response.content) | |
| encoding = encoding_detection['encoding'] or 'utf-8' | |
| logger.debug(f"Chardet detected encoding: {encoding} for {url}") | |
| except Exception as e: | |
| logger.warning(f"Chardet detection failed for {url}: {e}. Falling back to utf-8.") | |
| encoding = 'utf-8' | |
| raw_content = response.content.decode(encoding, errors='replace') | |
| # Extract metadata | |
| metadata = { | |
| 'original_url': url, | |
| 'final_url': final_url, | |
| 'timestamp': datetime.now().isoformat(), | |
| 'detected_encoding': encoding, | |
| 'content_type': content_type, | |
| 'content_length': len(response.content), | |
| 'headers': dict(response.headers), | |
| 'status_code': response.status_code | |
| } | |
| # Process based on content type | |
| processed_extraction = self._process_web_content(raw_content, metadata['content_type'], final_url) | |
| return { | |
| 'source': 'url', | |
| 'url': url, # Keep original URL as identifier for this step | |
| 'raw_content': raw_content, | |
| 'metadata': metadata, | |
| 'extracted_data': processed_extraction['data'], | |
| 'processing_notes': processed_extraction['notes'] | |
| } | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"Failed to fetch content from {url}: {e}") | |
| return { | |
| 'source': 'url', | |
| 'url': url, | |
| 'raw_content': None, | |
| 'metadata': {'original_url': url, 'timestamp': datetime.now().isoformat(), | |
| 'status_code': getattr(e.response, 'status_code', None)}, | |
| 'extracted_data': None, | |
| 'processing_notes': [f"Failed to fetch content: {str(e)}"] | |
| } | |
| except Exception as e: | |
| logger.error(f"Unexpected error while fetching or processing URL {url}: {e}") | |
| return { | |
| 'source': 'url', | |
| 'url': url, | |
| 'raw_content': raw_content if 'raw_content' in locals() else None, | |
| 'metadata': metadata if 'metadata' in locals() else {'original_url': url, | |
| 'timestamp': datetime.now().isoformat(), | |
| 'status_code': None}, | |
| 'extracted_data': None, | |
| 'processing_notes': [f"Unexpected processing error: {str(e)}"] | |
| } | |
| def _process_web_content(self, content: str, content_type: str, base_url: str) -> Dict[str, Any]: | |
| """Process content based on detected content type""" | |
| lower_content_type = content_type.lower() | |
| notes = [] | |
| extracted_data: Any = None | |
| try: | |
| if 'text/html' in lower_content_type: | |
| logger.debug(f"Processing HTML content from {base_url}") | |
| extracted_data = self._process_html_content_enhanced(content, base_url) | |
| notes.append("Processed as HTML") | |
| elif 'application/json' in lower_content_type or 'text/json' in lower_content_type: | |
| logger.debug(f"Processing JSON content from {base_url}") | |
| try: | |
| extracted_data = json.loads(content) | |
| notes.append("Parsed as JSON") | |
| except json.JSONDecodeError as e: | |
| extracted_data = content | |
| notes.append(f"Failed to parse as JSON: {e}") | |
| logger.warning(f"Failed to parse JSON from {base_url}: {e}") | |
| except Exception as e: | |
| extracted_data = content | |
| notes.append(f"Error processing JSON: {e}") | |
| logger.error(f"Error processing JSON from {base_url}: {e}") | |
| elif 'application/xml' in lower_content_type or 'text/xml' in lower_content_type or lower_content_type.endswith( | |
| '+xml'): | |
| logger.debug(f"Processing XML content from {base_url}") | |
| try: | |
| root = ET.fromstring(content) | |
| xml_text = ET.tostring(root, encoding='unicode', method='xml') | |
| extracted_data = xml_text | |
| notes.append("Parsed as XML (text representation)") | |
| except ET.ParseError as e: | |
| extracted_data = content | |
| notes.append(f"Failed to parse as XML: {e}") | |
| logger.warning(f"Failed to parse XML from {base_url}: {e}") | |
| except Exception as e: | |
| extracted_data = content | |
| notes.append(f"Error processing XML: {e}") | |
| logger.error(f"Error processing XML from {base_url}: {e}") | |
| elif 'text/plain' in lower_content_type or 'text/' in lower_content_type: | |
| logger.debug(f"Processing Plain Text content from {base_url}") | |
| extracted_data = content | |
| notes.append("Processed as Plain Text") | |
| else: | |
| logger.debug(f"Unknown content type '{content_type}' from {base_url}. Storing raw content.") | |
| extracted_data = content | |
| notes.append(f"Unknown content type '{content_type}'. Stored raw text.") | |
| except Exception as e: | |
| logger.error(f"Unexpected error in _process_web_content for {base_url} ({content_type}): {e}") | |
| extracted_data = content | |
| notes.append(f"Unexpected processing error: {e}. Stored raw text.") | |
| return {'data': extracted_data, 'notes': notes} | |
| def _process_html_content_enhanced(self, content: str, base_url: str) -> Dict[str, Any]: | |
| """Process HTML content, preserving text, and extracting metadata and links.""" | |
| extracted: Dict[str, Any] = { | |
| 'title': None, | |
| 'meta_description': None, | |
| 'full_text': "", | |
| 'links': [], | |
| 'images': [], | |
| 'media': [] | |
| } | |
| try: | |
| soup = BeautifulSoup(content, 'html.parser') | |
| if soup.title and soup.title.string: | |
| extracted['title'] = soup.title.string.strip() | |
| meta_desc = soup.find('meta', attrs={'name': 'description'}) | |
| if meta_desc and meta_desc.get('content'): | |
| extracted['meta_description'] = meta_desc['content'].strip() | |
| # Extract links | |
| unique_links = set() | |
| for a_tag in soup.find_all('a', href=True): | |
| href = a_tag['href'].strip() | |
| if href and not href.startswith(('#', 'mailto:', 'tel:', 'javascript:')): | |
| text = a_tag.get_text().strip() | |
| try: | |
| absolute_url = urljoin(base_url, href) | |
| if absolute_url not in unique_links: | |
| extracted['links'].append({'text': text, 'url': absolute_url}) | |
| unique_links.add(absolute_url) | |
| except Exception: | |
| if validators.url(href) and href not in unique_links: | |
| extracted['links'].append({'text': text, 'url': href}) | |
| unique_links.add(href) | |
| elif urlparse(href).netloc and href not in unique_links: | |
| extracted['links'].append({'text': text, 'url': href}) | |
| unique_links.add(href) | |
| # Extract images | |
| unique_images = set() | |
| for img_tag in soup.find_all('img', src=True): | |
| src = img_tag['src'].strip() | |
| alt = img_tag.get('alt', '').strip() | |
| if src and src not in unique_images: | |
| absolute_url = urljoin(base_url, src) | |
| extracted['images'].append({'src': absolute_url, 'alt': alt}) | |
| unique_images.add(src) | |
| # Extract media (audio/video) | |
| unique_media = set() | |
| for media_tag in soup.find_all(['audio', 'video'], src=True): | |
| src = media_tag['src'].strip() | |
| if src and src not in unique_media: | |
| absolute_url = urljoin(base_url, src) | |
| extracted['media'].append({'src': absolute_url, 'type': media_tag.name}) | |
| unique_media.add(src) | |
| # Extract text content | |
| soup_copy = BeautifulSoup(content, 'html.parser') | |
| for script_or_style in soup_copy(["script", "style"]): | |
| script_or_style.extract() | |
| text = soup_copy.get_text(separator='\n') | |
| lines = text.splitlines() | |
| cleaned_lines = [line.strip() for line in lines if line.strip()] | |
| extracted['full_text'] = '\n'.join(cleaned_lines) | |
| except Exception as e: | |
| logger.error(f"Enhanced HTML processing error for {base_url}: {e}") | |
| soup_copy = BeautifulSoup(content, 'html.parser') | |
| for script_or_style in soup_copy(["script", "style"]): | |
| script_or_style.extract() | |
| extracted['full_text'] = soup_copy.get_text(separator='\n').strip() | |
| extracted['processing_error'] = f"Enhanced HTML processing failed: {e}" | |
| return extracted | |
| def fetch_content_with_depth(self, url: str, max_steps: int = 0) -> Dict[str, Any]: | |
| """Fetches content from a URL and recursively follows links up to max_steps depth.""" | |
| if not isinstance(max_steps, int) or not (0 <= max_steps <= 10): | |
| logger.error(f"Invalid max_steps value: {max_steps}. Must be an integer between 0 and 10.") | |
| return { | |
| 'url': url, | |
| 'level': 0, | |
| 'fetch_result': None, | |
| 'linked_extractions': [], | |
| 'processing_notes': [f"Invalid max_steps value: {max_steps}. Must be an integer between 0 and 10."] | |
| } | |
| validation_result = self.validate_url(url) | |
| if not validation_result['is_valid']: | |
| logger.error(f"Initial URL validation failed for {url}: {validation_result['message']}") | |
| return { | |
| 'url': url, | |
| 'level': 0, | |
| 'fetch_result': None, | |
| 'linked_extractions': [], | |
| 'processing_notes': [f"Initial URL validation failed: {validation_result['message']}"] | |
| } | |
| # Use a set to keep track of visited URLs during the crawl to avoid infinite loops | |
| visited_urls = set() | |
| return self._fetch_content_recursive(url, max_steps, current_step=0, visited_urls=visited_urls) | |
| def _fetch_content_recursive(self, url: str, max_steps: int, current_step: int, | |
| visited_urls: set) -> Dict[str, Any]: | |
| """Recursive helper function to fetch content and follow links.""" | |
| if current_step > max_steps: | |
| logger.debug(f"Depth limit ({max_steps}) reached for {url} at level {current_step}.") | |
| return { | |
| 'url': url, | |
| 'level': current_step, | |
| 'fetch_result': None, | |
| 'linked_extractions': [], | |
| 'processing_notes': [f"Depth limit ({max_steps}) reached."] | |
| } | |
| # Normalize URL before checking visited set | |
| normalized_url = url.rstrip('/') # Simple normalization | |
| if normalized_url in visited_urls: | |
| logger.debug(f"Skipping already visited URL: {url} at level {current_step}.") | |
| return { | |
| 'url': url, | |
| 'level': current_step, | |
| 'fetch_result': None, # Indicate not fetched in this run | |
| 'linked_extractions': [], | |
| 'processing_notes': ["URL already visited in this crawl."] | |
| } | |
| visited_urls.add(normalized_url) # Mark as visited | |
| logger.info(f"Processing URL: {url} at level {current_step}/{max_steps}") | |
| fetch_result = self.fetch_content(url) | |
| linked_extractions: List[Dict[str, Any]] = [] | |
| if fetch_result and fetch_result.get('extracted_data') and 'text/html' in fetch_result.get('metadata', {}).get( | |
| 'content_type', '').lower(): | |
| extracted_data = fetch_result['extracted_data'] | |
| links = extracted_data.get('links', []) | |
| logger.info( | |
| f"Found {len(links)} potential links on {url} at level {current_step}. Proceeding to depth {current_step + 1}.") | |
| if current_step < max_steps: | |
| for link_info in links: | |
| linked_url = link_info.get('url') | |
| if linked_url: | |
| # Ensure linked URL is absolute and potentially within the same domain | |
| # Simple same-domain check (can be made more sophisticated) | |
| try: | |
| base_domain = urlparse(url).netloc | |
| linked_domain = urlparse(linked_url).netloc | |
| if linked_domain and linked_domain != base_domain: | |
| logger.debug(f"Skipping external link: {linked_url}") | |
| continue # Skip external links | |
| # Recursively call for linked URLs | |
| linked_result = self._fetch_content_recursive(linked_url, max_steps, current_step + 1, | |
| visited_urls) | |
| if linked_result: | |
| linked_extractions.append(linked_result) | |
| except Exception as e: | |
| logger.warning(f"Error processing linked URL {linked_url} from {url}: {e}") | |
| current_notes = fetch_result.get('processing_notes', []) if fetch_result else ['Fetch failed.'] | |
| if f"Processed at level {current_step}" not in current_notes: | |
| current_notes.append(f"Processed at level {current_step}") | |
| return { | |
| 'url': url, | |
| 'level': current_step, | |
| 'fetch_result': fetch_result, | |
| 'linked_extractions': linked_extractions, | |
| 'processing_notes': current_notes | |
| } | |
| class EnhancedFileProcessor: | |
| """Advanced file processing with enhanced content extraction""" | |
| def __init__(self, max_file_size: int = 5 * 1024 * 1024 * 1024): # 5GB default | |
| self.max_file_size = max_file_size | |
| self.supported_extensions = { | |
| '.txt', '.md', '.csv', '.json', '.xml', '.html', '.htm', | |
| '.log', '.yml', '.yaml', '.ini', '.conf', '.cfg', | |
| '.pdf', '.doc', '.docx', '.rtf', '.odt', | |
| '.zip', '.tar', '.gz', '.bz2', '.7z', '.rar', | |
| } | |
| self.archive_extensions = {'.zip', '.tar', '.gz', '.bz2', '.7z', '.rar'} | |
| def process_file(self, file) -> List[Dict]: | |
| """Process uploaded file with enhanced error handling and complete extraction""" | |
| if not file or not hasattr(file, 'name'): | |
| logger.warning("Received invalid file object.") | |
| return [] | |
| dataset = [] | |
| file_path = Path(file.name) | |
| if not file_path.exists(): | |
| logger.error(f"File path does not exist: {file_path}") | |
| return [{ | |
| 'source': 'file', | |
| 'filename': file.name if hasattr(file, 'name') else 'unknown', | |
| 'file_size': None, | |
| 'extracted_data': None, | |
| 'processing_notes': ['File path does not exist.'] | |
| }] | |
| try: | |
| file_size = file_path.stat().st_size | |
| if file_size > self.max_file_size: | |
| logger.warning( | |
| f"File '{file_path.name}' size ({file_size} bytes) exceeds maximum allowed size ({self.max_file_size} bytes).") | |
| return [{ | |
| 'source': 'file', | |
| 'filename': file_path.name, | |
| 'file_size': file_size, | |
| 'extracted_data': None, | |
| 'processing_notes': ['File size exceeds limit.'] | |
| }] | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| temp_dir_path = Path(temp_dir) | |
| if file_path.suffix.lower() in self.archive_extensions: | |
| dataset.extend(self._process_archive(file_path, temp_dir_path)) | |
| elif file_path.suffix.lower() in self.supported_extensions: | |
| dataset.extend(self._process_single_file(file_path)) | |
| else: | |
| logger.warning(f"Unsupported file type for processing: '{file_path.name}'. Attempting to read as plain text.") | |
| try: | |
| content_bytes = file_path.read_bytes() | |
| encoding_detection = chardet.detect(content_bytes) | |
| encoding = encoding_detection['encoding'] or 'utf-8' | |
| raw_content = content_bytes.decode(encoding, errors='replace') | |
| dataset.append({ | |
| 'source': 'file', | |
| 'filename': file_path.name, | |
| 'file_size': file_size, | |
| 'mime_type': mimetypes.guess_type(file_path.name)[0] or 'unknown/unknown', | |
| 'extracted_data': {'plain_text': raw_content}, | |
| 'processing_notes': ['Processed as plain text (unsupported extension).'] | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error reading or processing unsupported file '{file_path.name}' as text: {e}") | |
| dataset.append({ | |
| 'source': 'file', | |
| 'filename': file_path.name, | |
| 'file_size': file_size, | |
| 'mime_type': mimetypes.guess_type(file_path.name)[0] or 'unknown/unknown', | |
| 'extracted_data': None, | |
| 'processing_notes': [f'Unsupported file type and failed to read as text: {e}'] | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error processing file '{file_path.name}': {str(e)}") | |
| dataset.append({ | |
| 'source': 'file', | |
| 'filename': file_path.name, | |
| 'file_size': file_size if 'file_size' in locals() else None, | |
| 'extracted_data': None, | |
| 'processing_notes': [f'Overall file processing error: {str(e)}'] | |
| }) | |
| return dataset | |
| def _is_archive(self, filepath: Union[str, Path]) -> bool: | |
| """Check if file is an archive""" | |
| p = Path(filepath) if isinstance(filepath, str) else filepath | |
| return p.suffix.lower() in self.archive_extensions | |
| def _process_single_file(self, file_path: Path) -> List[Dict]: | |
| """Process a single file with enhanced character extraction and format-specific handling""" | |
| dataset_entries = [] | |
| filename = file_path.name | |
| file_size = file_path.stat().st_size | |
| mime_type, _ = mimetypes.guess_type(file_path) | |
| mime_type = mime_type or 'unknown/unknown' | |
| file_extension = file_path.suffix.lower() | |
| logger.info(f"Processing single file: '{filename}' ({mime_type}, {file_size} bytes)") | |
| raw_content: Optional[str] = None | |
| extracted_data: Any = None | |
| processing_notes: List[str] = [] | |
| try: | |
| content_bytes = file_path.read_bytes() | |
| encoding_detection = chardet.detect(content_bytes) | |
| encoding = encoding_detection['encoding'] or 'utf-8' | |
| raw_content = content_bytes.decode(encoding, errors='replace') | |
| is_explicit_json = mime_type == 'application/json' or file_extension == '.json' | |
| looks_like_json = raw_content.strip().startswith('{') or raw_content.strip().startswith('[') | |
| if is_explicit_json or looks_like_json: | |
| try: | |
| extracted_data = json.loads(raw_content) | |
| processing_notes.append("Parsed as JSON.") | |
| if not is_explicit_json: | |
| processing_notes.append("Note: Content looked like JSON despite extension/mime.") | |
| logger.warning(f"File '{filename}' identified as JSON content despite extension/mime.") | |
| mime_type = 'application/json' | |
| except json.JSONDecodeError as e: | |
| processing_notes.append(f"Failed to parse as JSON: {e}.") | |
| if is_explicit_json: | |
| logger.error(f"Explicit JSON file '{filename}' has invalid format: {e}") | |
| else: | |
| logger.warning(f"Content of '{filename}' looks like JSON but failed to parse: {e}") | |
| except Exception as e: | |
| processing_notes.append(f"Error processing JSON: {e}.") | |
| logger.error(f"Error processing JSON in '{filename}': {e}") | |
| looks_like_xml = extracted_data is None and raw_content.strip().startswith( | |
| '<') and raw_content.strip().endswith('>') | |
| is_explicit_xml = extracted_data is None and ( | |
| mime_type in ('application/xml', 'text/xml') or mime_type.endswith('+xml') or file_extension in ( | |
| '.xml', '.xsd')) | |
| if extracted_data is None and (is_explicit_xml or looks_like_xml): | |
| try: | |
| root = ET.fromstring(raw_content) | |
| extracted_data = ET.tostring(root, encoding='unicode', method='xml') | |
| processing_notes.append("Parsed as XML (text representation).") | |
| if not is_explicit_xml: | |
| processing_notes.append("Note: Content looked like XML despite extension/mime.") | |
| if 'xml' not in mime_type: mime_type = 'application/xml' | |
| except ET.ParseError as e: | |
| processing_notes.append(f"Failed to parse as XML: {e}.") | |
| if is_explicit_xml: | |
| logger.error(f"Explicit XML file '{filename}' has invalid format: {e}") | |
| else: | |
| logger.warning(f"Content of '{filename}' looks like XML but failed to parse: {e}") | |
| except Exception as e: | |
| processing_notes.append(f"Error processing XML: {e}.") | |
| logger.error(f"Error processing XML in '{filename}': {e}") | |
| is_explicit_csv = extracted_data is None and (mime_type == 'text/csv' or file_extension == '.csv') | |
| looks_like_csv = extracted_data is None and (',' in raw_content or ';' in raw_content) and ( | |
| '\n' in raw_content or len(raw_content.splitlines()) > 1) | |
| if extracted_data is None and (is_explicit_csv or looks_like_csv): | |
| try: | |
| dialect = 'excel' | |
| try: | |
| sample = '\n'.join(raw_content.splitlines()[:10]) | |
| if sample: | |
| dialect = csv.Sniffer().sniff(sample).name | |
| logger.debug(f"Sniffer detected CSV dialect: {dialect} for '{filename}'") | |
| except csv.Error: | |
| logger.debug(f"Sniffer failed to detect dialect for '{filename}', using 'excel'.") | |
| dialect = 'excel' | |
| csv_reader = csv.reader(io.StringIO(raw_content), dialect=dialect) | |
| rows = list(csv_reader) | |
| if rows: | |
| max_rows_preview = 100 | |
| extracted_data = { | |
| 'headers': rows[0] if rows and rows[0] else None, | |
| 'rows': rows[1:max_rows_preview + 1] if len(rows) > 1 else [] | |
| } | |
| if len(rows) > max_rows_preview + 1: | |
| processing_notes.append(f"CSV data rows truncated to {max_rows_preview}.") | |
| processing_notes.append("Parsed as CSV.") | |
| if not is_explicit_csv: | |
| processing_notes.append("Note: Content looked like CSV despite extension/mime.") | |
| mime_type = 'text/csv' | |
| else: | |
| extracted_data = "Empty CSV" | |
| processing_notes.append("Parsed as empty CSV.") | |
| if not is_explicit_csv: | |
| processing_notes.append("Note: Content looked like CSV but was empty.") | |
| except Exception as e: | |
| processing_notes.append(f"Failed to parse as CSV: {e}.") | |
| logger.warning(f"Failed to parse CSV from '{filename}': {e}") | |
| if extracted_data is None: | |
| try: | |
| extracted_text = None | |
| if file_extension == '.pdf' and PDF_SUPPORT: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file: | |
| tmp_file.write(content_bytes) | |
| temp_path = Path(tmp_file.name) | |
| try: | |
| reader = PdfReader(temp_path) | |
| text_content = "".join(page.extract_text() or "" for page in reader.pages) | |
| extracted_text = text_content | |
| processing_notes.append("Extracted text from PDF.") | |
| finally: | |
| if temp_path.exists(): temp_path.unlink() | |
| elif file_extension == '.docx' and DOCX_SUPPORT: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.docx') as tmp_file: | |
| tmp_file.write(content_bytes) | |
| temp_path = Path(tmp_file.name) | |
| try: | |
| document = Document(temp_path) | |
| text_content = "\n".join(paragraph.text for paragraph in document.paragraphs) | |
| extracted_text = text_content | |
| processing_notes.append("Extracted text from DOCX.") | |
| finally: | |
| if temp_path.exists(): temp_path.unlink() | |
| elif file_extension == '.rtf' and RTF_SUPPORT: | |
| try: | |
| doc = Rtf15Reader.read(io.StringIO(raw_content)) | |
| text_content = PlaintextWriter.write(doc).getvalue() | |
| extracted_text = text_content | |
| processing_notes.append("Extracted text from RTF.") | |
| except Exception as e: | |
| processing_notes.append(f"RTF extraction error: {e}") | |
| logger.warning(f"Failed to extract RTF text from '{filename}': {e}") | |
| elif file_extension == '.odt' and ODT_SUPPORT: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.odt') as tmp_file: | |
| tmp_file.write(content_bytes) | |
| temp_path = Path(tmp_file.name) | |
| try: | |
| text_doc = OpenDocumentText(temp_path) | |
| paragraphs = text_doc.getElementsByType(odftext.P) | |
| text_content = "\n".join( | |
| "".join(node.text for node in p.childNodes) for p in paragraphs) | |
| extracted_text = text_content | |
| processing_notes.append("Extracted text from ODT.") | |
| finally: | |
| if temp_path.exists(): temp_path.unlink() | |
| elif file_extension in ['.doc', '.ppt', '.pptx', '.xls', '.xlsx']: | |
| processing_notes.append( | |
| f"Automatic text extraction for {file_extension.upper()} not fully implemented.") | |
| logger.warning( | |
| f"Automatic text extraction for {file_extension.upper()} not fully implemented for '{filename}'.") | |
| if extracted_text is not None: | |
| max_extracted_text_size = 10000 | |
| extracted_data = {'text': extracted_text[:max_extracted_text_size]} | |
| if len(extracted_text) > max_extracted_text_size: | |
| extracted_data['text'] += "..." | |
| processing_notes.append("Extracted text truncated.") | |
| except ImportError as e: | |
| processing_notes.append(f"Missing dependency for document type ({e}). Cannot extract text.") | |
| except Exception as e: | |
| processing_notes.append(f"Error during document text extraction: {e}") | |
| logger.warning(f"Error during document text extraction for '{filename}': {e}") | |
| if extracted_data is None: | |
| extracted_data = {'plain_text': raw_content} | |
| processing_notes.append("Stored as plain text.") | |
| if mime_type in ['unknown/unknown', 'application/octet-stream']: | |
| guessed_text_mime, _ = mimetypes.guess_type('dummy.txt') | |
| if guessed_text_mime: mime_type = guessed_text_mime | |
| except Exception as e: | |
| logger.error(f"Fatal error processing single file '{filename}': {e}") | |
| processing_notes.append(f"Fatal processing error: {e}") | |
| raw_content = None | |
| extracted_data = None | |
| entry = { | |
| 'source': 'file', | |
| 'filename': filename, | |
| 'file_size': file_size, | |
| 'mime_type': mime_type, | |
| 'created': datetime.fromtimestamp(file_path.stat().st_ctime).isoformat() if file_path.exists() else None, | |
| 'modified': datetime.fromtimestamp(file_path.stat().st_mtime).isoformat() if file_path.exists() else None, | |
| 'raw_content': raw_content, | |
| 'extracted_data': extracted_data, | |
| 'processing_notes': processing_notes | |
| } | |
| dataset_entries.append(entry) | |
| return dataset_entries | |
| def _process_archive(self, archive_path: Path, extract_to: Path) -> List[Dict]: | |
| """Process an archive file with enhanced extraction""" | |
| dataset = [] | |
| archive_extension = archive_path.suffix.lower() | |
| logger.info(f"Processing archive: '{archive_path.name}'") | |
| try: | |
| if archive_extension == '.zip': | |
| if zipfile.is_zipfile(archive_path): | |
| with zipfile.ZipFile(archive_path, 'r') as zip_ref: | |
| for file_info in zip_ref.infolist(): | |
| if file_info.file_size > 0 and not file_info.filename.endswith('/'): | |
| sanitized_filename = Path(file_info.filename).name | |
| extracted_file_path = extract_to / sanitized_filename | |
| try: | |
| with zip_ref.open(file_info) as zf, open(extracted_file_path, 'wb') as outfile: | |
| outfile.write(zf.read()) | |
| if extracted_file_path.suffix.lower() in self.supported_extensions and not self._is_archive( | |
| extracted_file_path): | |
| dataset.extend(self._process_single_file(extracted_file_path)) | |
| elif extracted_file_path.suffix.lower() in self.archive_extensions: | |
| logger.info(f"Found nested archive '{file_info.filename}', processing recursively.") | |
| dataset.extend(self._process_archive(extracted_file_path, extract_to)) | |
| else: | |
| logger.debug(f"Skipping unsupported file in archive: '{file_info.filename}'") | |
| except Exception as e: | |
| logger.warning( | |
| f"Error extracting/processing file '{file_info.filename}' from zip '{archive_path.name}': {e}") | |
| finally: | |
| if extracted_file_path.exists(): | |
| try: | |
| extracted_file_path.unlink() | |
| except OSError as e: | |
| logger.warning( | |
| f"Failed to clean up extracted file {extracted_file_path}: {e}") | |
| else: | |
| logger.error(f"'{archive_path.name}' is not a valid zip file.") | |
| elif archive_extension in ('.tar', '.gz', '.tgz'): | |
| try: | |
| mode = 'r' | |
| if archive_extension in ('.tar.gz', '.tgz', '.gz'): mode = 'r:gz' | |
| with tarfile.open(archive_path, mode) as tar_ref: | |
| for member in tar_ref.getmembers(): | |
| if member.isfile(): | |
| sanitized_filename = Path(member.name).name | |
| extracted_file_path = extract_to / sanitized_filename | |
| try: | |
| if not str(extracted_file_path).startswith(str(extract_to)): | |
| logger.warning(f"Skipping potentially malicious path in tar: {member.name}") | |
| continue | |
| with tar_ref.extractfile(member) as tf, open(extracted_file_path, | |
| 'wb') as outfile: | |
| if tf: | |
| outfile.write(tf.read()) | |
| else: | |
| logger.warning( | |
| f"Could not extract file-like object for {member.name} from tar.") | |
| continue | |
| if extracted_file_path.suffix.lower() in self.supported_extensions and not self._is_archive( | |
| extracted_file_path): | |
| dataset.extend(self._process_single_file(extracted_file_path)) | |
| elif extracted_file_path.suffix.lower() in self.archive_extensions: | |
| logger.info(f"Found nested archive '{member.name}', processing recursively.") | |
| dataset.extend(self._process_archive(extracted_file_path, extract_to)) | |
| else: | |
| logger.debug(f"Skipping unsupported file in archive: '{member.name}'") | |
| except Exception as e: | |
| logger.warning( | |
| f"Error extracting/processing file '{member.name}' from tar '{archive_path.name}': {e}") | |
| finally: | |
| if extracted_file_path.exists(): | |
| try: | |
| extracted_file_path.unlink() | |
| except OSError as e: | |
| logger.warning( | |
| f"Failed to clean up extracted file {extracted_file_path}: {e}") | |
| except tarfile.TarError as e: | |
| logger.error(f"Error processing TAR archive '{archive_path.name}': {e}") | |
| elif archive_extension == '.gz': # This case is handled by tarfile, but added for single .gz files | |
| extracted_name = archive_path.stem | |
| extracted_path = extract_to / extracted_name | |
| try: | |
| with gzip.open(archive_path, 'rb') as gz_file, open(extracted_path, 'wb') as outfile: | |
| outfile.write(gz_file.read()) | |
| if extracted_path.suffix.lower() in self.supported_extensions and not self._is_archive( | |
| extracted_path): | |
| dataset.extend(self._process_single_file(extracted_path)) | |
| elif extracted_path.suffix.lower() in self.archive_extensions: | |
| logger.info(f"Found nested archive '{extracted_name}', processing recursively.") | |
| dataset.extend(self._process_archive(extracted_path, extract_to)) | |
| else: | |
| logger.debug(f"Skipping unsupported file (from gz): '{extracted_name}'") | |
| except gzip.GzipFile as e: | |
| logger.error(f"Error processing GZIP file '{archive_path.name}': {e}") | |
| except Exception as e: | |
| logger.error(f"Error extracting/processing from GZIP '{archive_path.name}': {e}") | |
| finally: | |
| if extracted_path.exists(): | |
| try: | |
| extracted_path.unlink() | |
| except OSError as e: | |
| logger.warning(f"Failed to clean up extracted file {extracted_path}: {e}") | |
| elif archive_extension in ('.bz2', '.7z', '.rar'): | |
| logger.warning( | |
| f"Support for {archive_extension} archives is not yet fully implemented and requires external tools/libraries.") | |
| except Exception as e: | |
| logger.error(f"Overall archive processing error for '{archive_path.name}': {e}") | |
| return dataset | |
| def chunk_data(self, data: Union[Dict, List], max_size: int = 2953) -> List[str]: | |
| """ | |
| Enhanced data chunking for QR codes with sequence metadata and start/end tags. | |
| max_size is the maximum *byte* capacity for a QR code (e.g., 2953 bytes for Version 40-L). | |
| """ | |
| try: | |
| json_str = json.dumps(data, ensure_ascii=False, separators=(',', ':')) | |
| json_bytes = json_str.encode('utf-8') | |
| total_bytes_length = len(json_bytes) | |
| MAX_OVERHEAD_PER_CHUNK_BYTES = 250 | |
| PRACTICAL_MAX_QR_CODE_BYTE_CAPACITY = 2900 | |
| effective_payload_bytes_per_chunk = PRACTICAL_MAX_QR_CODE_BYTE_CAPACITY - MAX_OVERHEAD_PER_CHUNK_BYTES | |
| if effective_payload_bytes_per_chunk <= 0: | |
| logger.error( | |
| f"Effective payload size is zero or negative. QR size ({PRACTICAL_MAX_QR_CODE_BYTE_CAPACITY}) is too small for metadata overhead ({MAX_OVERHEAD_PER_CHUNK_BYTES}). Cannot chunk.") | |
| return [] | |
| num_chunks = math.ceil(total_bytes_length / effective_payload_bytes_per_chunk) if total_bytes_length > 0 else 0 | |
| if num_chunks == 0: | |
| return [] | |
| chunks_for_qr: List[str] = [] | |
| current_byte_pos = 0 | |
| for i in range(num_chunks): | |
| end_byte_pos = min(current_byte_pos + effective_payload_bytes_per_chunk, total_bytes_length) | |
| chunk_data_bytes = json_bytes[current_byte_pos:end_byte_pos] | |
| chunk_data_str = chunk_data_bytes.decode('utf-8', errors='replace') | |
| chunk_dict = { | |
| "idx": i + 1, | |
| "tc": num_chunks, | |
| "tl": total_bytes_length, | |
| "hash": hash(chunk_data_bytes) & 0xFFFFFFFF, | |
| "data": chunk_data_str | |
| } | |
| inner_json_string = json.dumps(chunk_dict, ensure_ascii=False, separators=(',', ':')) | |
| final_qr_string = f"{{start{i + 1}}}{inner_json_string}{{end{i + 1}}}" | |
| encoded_final_qr_string_len = len(final_qr_string.encode('utf-8')) | |
| if encoded_final_qr_string_len > PRACTICAL_MAX_QR_CODE_BYTE_CAPACITY: | |
| logger.warning( | |
| f"Chunk {i + 1} exceeds estimated QR capacity. Actual: {encoded_final_qr_string_len} bytes, Target Max: {PRACTICAL_MAX_QR_CODE_BYTE_CAPACITY} bytes. Consider increasing MAX_OVERHEAD_PER_CHUNK_BYTES further.") | |
| chunks_for_qr.append(final_qr_string) | |
| current_byte_pos = end_byte_pos | |
| if current_byte_pos < total_bytes_length: | |
| logger.error(f"Chunking logic error: Only processed {current_byte_pos} of {total_bytes_length} bytes.") | |
| return [] | |
| logger.info(f"Chunked data into {num_chunks} chunks for QR codes, with positional sequencing tags.") | |
| return chunks_for_qr | |
| except Exception as e: | |
| logger.error(f"Error chunking data: {e}") | |
| return [] | |
| def generate_stylish_qr(data: Union[str, Dict], | |
| filename: str, | |
| size: int = 10, | |
| border: int = 4, | |
| fill_color: str = "#000000", | |
| back_color: str = "#FFFFFF") -> str: | |
| """Generate a stylish QR code with enhanced visual appeal""" | |
| try: | |
| qr = qrcode.QRCode( | |
| version=None, | |
| error_correction=qrcode.constants.ERROR_CORRECT_M, | |
| box_size=size, | |
| border=border | |
| ) | |
| if isinstance(data, dict): | |
| qr.add_data(json.dumps(data, ensure_ascii=False, separators=(',', ':'))) | |
| else: | |
| qr.add_data(str(data)) | |
| qr.make(fit=True) | |
| qr_image = qr.make_image(fill_color=fill_color, back_color=back_color) | |
| qr_image = qr_image.convert('RGBA') | |
| try: | |
| gradient = Image.new('RGBA', qr_image.size, (0, 0, 0, 0)) | |
| draw = ImageDraw.Draw(gradient) | |
| for i in range(qr_image.width): | |
| alpha = int(255 * (i / qr_image.width) * 0.05) | |
| draw.line([(i, 0), (i, qr_image.height)], fill=(0, 0, 0, alpha)) | |
| final_image = Image.alpha_composite(qr_image, gradient) | |
| except Exception as e: | |
| logger.warning(f"Failed to add gradient overlay to QR code: {e}. Using plain QR.") | |
| final_image = qr_image | |
| output_path = QR_CODES_DIR / filename | |
| final_image.save(output_path, quality=90) | |
| return str(output_path) | |
| except Exception as e: | |
| logger.error(f"QR generation error: {e}") | |
| return "" | |
| def generate_qr_codes(data: Union[str, Dict, List], combined: bool = True) -> List[str]: | |
| """Generate QR codes with enhanced visual appeal and metadata""" | |
| if not isinstance(data, (list, dict, str)): | |
| logger.error("generate_qr_codes received data that is not a list, dict, or string.") | |
| return [] | |
| try: | |
| file_processor = EnhancedFileProcessor() | |
| paths = [] | |
| if combined: | |
| chunks_of_combined_data = file_processor.chunk_data(data) | |
| if not chunks_of_combined_data: | |
| logger.warning("No chunks generated for combined data.") | |
| return [] | |
| for i, chunk_str in enumerate(chunks_of_combined_data): | |
| filename = f'combined_qr_{i + 1}_of_{len(chunks_of_combined_data)}_{int(time.time())}.png' | |
| qr_path = generate_stylish_qr( | |
| data=chunk_str, | |
| filename=filename, | |
| fill_color="#1a365d", | |
| back_color="#ffffff" | |
| ) | |
| if qr_path: | |
| paths.append(qr_path) | |
| else: | |
| logger.warning(f"Failed to generate QR for combined chunk {i + 1}/{len(chunks_of_combined_data)}.") | |
| else: | |
| if isinstance(data, list): | |
| for idx, item in enumerate(data): | |
| item_chunks = file_processor.chunk_data(item) | |
| if not item_chunks: | |
| logger.warning(f"No chunks generated for item {idx + 1}.") | |
| continue | |
| for chunk_idx, chunk_str in enumerate(item_chunks): | |
| filename = f'item_{idx + 1}_chunk_{chunk_idx + 1}_of_{len(item_chunks)}_{int(time.time())}.png' | |
| qr_path = generate_stylish_qr( | |
| data=chunk_str, | |
| filename=filename, | |
| fill_color="#1a365d", | |
| back_color="#ffffff" | |
| ) | |
| if qr_path: | |
| paths.append(qr_path) | |
| else: | |
| logger.warning(f"Failed to generate QR for item {idx + 1} chunk {chunk_idx + 1}/{len(item_chunks)}.") | |
| elif isinstance(data, (dict, str)): | |
| single_item_chunks = file_processor.chunk_data(data) | |
| if not single_item_chunks: | |
| logger.warning("No chunks generated for single item.") | |
| return [] | |
| for chunk_idx, chunk_str in enumerate(single_item_chunks): | |
| filename = f'single_item_chunk_{chunk_idx + 1}_of_{len(single_item_chunks)}_{int(time.time())}.png' | |
| qr_path = generate_stylish_qr( | |
| data=chunk_str, | |
| filename=filename, | |
| fill_color="#1a365d", | |
| back_color="#ffffff" | |
| ) | |
| if qr_path: | |
| paths.append(qr_path) | |
| else: | |
| logger.warning(f"Failed to generate QR for single item chunk {chunk_idx + 1}/{len(single_item_chunks)}.") | |
| else: | |
| logger.warning("Data is not a list, dict, or string and cannot be processed individually.") | |
| logger.info(f"Generated {len(paths)} QR codes.") | |
| return paths | |
| except Exception as e: | |
| logger.error(f"An unexpected error occurred in generate_qr_codes: {e}") | |
| return [] | |
| def respond_to_chat( | |
| message: str, | |
| chat_history: List[Tuple[str, str]], | |
| chatbot_data: Optional[List[Dict]], | |
| current_filtered_df_state: Optional[pd.DataFrame]) -> Tuple[ | |
| List[Tuple[str, str]], List[Dict], Optional[pd.DataFrame]]: | |
| """ | |
| Responds to user chat messages based on the loaded JSON data. | |
| Manages and returns the state of the filtered DataFrame. | |
| """ | |
| if chatbot_data is None or not chatbot_data: | |
| chat_history.append((message, "Please process some data first using the other tabs before chatting.")) | |
| return chat_history, chatbot_data, current_filtered_df_state | |
| chat_history.append((message, "")) | |
| response = "" | |
| lower_message = message.lower().strip() | |
| new_filtered_df_state = current_filtered_df_state | |
| try: | |
| flat_data = [] | |
| def flatten_item(d, parent_key='', sep='_'): | |
| items = [] | |
| if isinstance(d, dict): | |
| for k, v in d.items(): | |
| new_key = parent_key + sep + k if parent_key else k | |
| if isinstance(v, (dict, list)): | |
| items.extend(flatten_item(v, new_key, sep=sep).items()) | |
| else: | |
| items.append((new_key, v)) | |
| elif isinstance(d, list): | |
| for i, elem in enumerate(d): | |
| if isinstance(elem, (dict, list)): | |
| items.extend( | |
| flatten_item(elem, f'{parent_key}_{i}' if parent_key else str(i), sep=sep).items()) | |
| else: | |
| items.append((f'{parent_key}_{i}' if parent_key else str(i), elem)) | |
| return dict(items) | |
| for i, item in enumerate(chatbot_data): | |
| if isinstance(item, dict): | |
| extracted_data_part = item.get('extracted_data') | |
| if isinstance(extracted_data_part, (dict, list)): | |
| flat_item_data = flatten_item(extracted_data_part, parent_key=f'item_{i}_extracted_data') | |
| metadata_part = {k: v for k, v in item.items() if | |
| k not in ['extracted_data', 'raw_content', 'linked_extractions']} | |
| flat_data.append({**metadata_part, **flat_item_data}) | |
| else: | |
| flat_data.append({k: v for k, v in item.items() if k != 'raw_content'}) | |
| elif isinstance(item, list): | |
| flat_data.extend(flatten_item(item, parent_key=f'item_{i}')) | |
| else: | |
| flat_data.append({f'item_{i}_value': item}) | |
| df = None | |
| if flat_data: | |
| try: | |
| df = pd.DataFrame(flat_data) | |
| logger.debug(f"Created DataFrame with shape: {df.shape}") | |
| logger.debug(f"DataFrame columns: {list(df.columns)}") | |
| except Exception as e: | |
| logger.warning(f"Could not create pandas DataFrame from processed data: {e}. Falling back to manual processing.") | |
| df = None | |
| if df is not None: | |
| if "what columns are available" in lower_message or "list columns" in lower_message: | |
| response = f"The available columns in the data are: {', '.join(df.columns)}" | |
| match = re.search(r'describe column (\w+)', lower_message) | |
| if match: | |
| column_name = match.group(1) | |
| if column_name in df.columns: | |
| description = df[column_name].describe().to_string() | |
| response = f"Description for column '{column_name}':\n```\n{description}\n```" | |
| else: | |
| response = f"I couldn't find a column named '{column_name}'. Available columns are: {', '.join(df.columns)}" | |
| match = re.search(r'how many unique values in (\w+)', lower_message) | |
| if match: | |
| column_name = match.group(1) | |
| if column_name in df.columns: | |
| unique_count = df[column_name].nunique() | |
| response = f"There are {unique_count} unique values in the '{column_name}' column." | |
| else: | |
| response = f"I couldn't find a column named '{column_name}' in the data. Available columns are: {', '.join(df.columns)}" | |
| match = re.search(r'what is the (average|sum|min|max) of (\w+)', lower_message) | |
| if match: | |
| operation, column_name = match.groups() | |
| if column_name in df.columns: | |
| try: | |
| numeric_col = pd.to_numeric(df[column_name], errors='coerce') | |
| numeric_col = numeric_col.dropna() | |
| if not numeric_col.empty: | |
| if operation == 'average': | |
| result = numeric_col.mean() | |
| response = f"The average of '{column_name}' is {result:.2f}." | |
| elif operation == 'sum': | |
| result = numeric_col.sum() | |
| response = f"The sum of '{column_name}' is {result:.2f}." | |
| elif operation == 'min': | |
| result = numeric_col.min() | |
| response = f"The minimum of '{column_name}' is {result}." | |
| elif operation == 'max': | |
| result = numeric_col.max() | |
| response = f"The maximum of '{column_name}' is {result}." | |
| else: | |
| response = "I can calculate average, sum, min, or max." | |
| else: | |
| response = f"The column '{column_name}' does not contain numeric values that I can analyze." | |
| except Exception as e: | |
| response = f"An error occurred while calculating the {operation} of '{column_name}': {e}" | |
| logger.error(f"Error calculating {operation} for column '{column_name}': {e}") | |
| else: | |
| response = f"I couldn't find a column named '{column_name}'. Available columns are: {', '.join(df.columns)}" | |
| filter_match = re.search( | |
| r'(?:filter|show items|show me items|find entries|select items|get items)\s+' | |
| r'(?:where|by|for|with|if)\s+' | |
| r'([\w\._-]+)\s+' # Allow underscores, periods, and hyphens in column names | |
| r'(is|equals?|==|!=|>=?|<=?|contains?|starts with|ends with)\s+' | |
| r'([\'"]?[\w\s\.-]+[\'"]?)', | |
| lower_message | |
| ) | |
| if filter_match: | |
| column_name, operator, value_str = filter_match.groups() | |
| column_name = column_name.strip() | |
| operator = operator.strip().lower() | |
| value_str = value_str.strip().strip("'\"") | |
| logger.info(f"Filter request: Column='{column_name}', Operator='{operator}', Value='{value_str}'") | |
| if column_name not in df.columns: | |
| response = f"I couldn't find a column named '{column_name}'. Available columns are: {', '.join(df.columns)}" | |
| new_filtered_df_state = None | |
| else: | |
| df_to_filter = df.copy() # Always filter from the full dataframe | |
| try: | |
| target_value: Any = None | |
| col_dtype = df_to_filter[column_name].dtype | |
| is_numeric_op = operator in ['>', '>=', '<', '<=', '==', '!='] | |
| is_numeric_col = pd.api.types.is_numeric_dtype(col_dtype) | |
| if is_numeric_op and is_numeric_col: | |
| try: | |
| target_value = float(value_str) | |
| col_series = pd.to_numeric(df_to_filter[column_name], errors='coerce') | |
| except ValueError: | |
| response = f"For numeric column '{column_name}', '{value_str}' is not a valid number." | |
| elif pd.api.types.is_bool_dtype(col_dtype) or value_str.lower() in ['true', 'false']: | |
| target_value = value_str.lower() == 'true' | |
| col_series = df_to_filter[column_name].astype(bool, errors='ignore') | |
| else: # Treat as string | |
| target_value = str(value_str) | |
| col_series = df_to_filter[column_name].astype(str).str.lower() | |
| value_str_lower = target_value.lower() | |
| if not response: # No error so far | |
| condition = None | |
| if operator in ['is', 'equals', '==']: | |
| if is_numeric_col or pd.api.types.is_bool_dtype(col_dtype): | |
| condition = col_series == target_value | |
| else: | |
| condition = col_series == value_str_lower | |
| elif operator == '!=': | |
| if is_numeric_col or pd.api.types.is_bool_dtype(col_dtype): | |
| condition = col_series != target_value | |
| else: | |
| condition = col_series != value_str_lower | |
| elif operator == '>' and is_numeric_col: | |
| condition = col_series > target_value | |
| elif operator == '>=' and is_numeric_col: | |
| condition = col_series >= target_value | |
| elif operator == '<' and is_numeric_col: | |
| condition = col_series < target_value | |
| elif operator == '<=' and is_numeric_col: | |
| condition = col_series <= target_value | |
| elif operator in ['contains', 'contain']: | |
| condition = df_to_filter[column_name].astype(str).str.contains(value_str, case=False, na=False) | |
| elif operator == 'starts with': | |
| condition = df_to_filter[column_name].astype(str).str.startswith(value_str, case=False, na=False) | |
| elif operator == 'ends with': | |
| condition = df_to_filter[column_name].astype(str).str.endswith(value_str, case=False, na=False) | |
| else: | |
| response = f"Unsupported operator '{operator}' for column '{column_name}' (type: {col_dtype})." | |
| if condition is not None: | |
| filtered_results_df = df_to_filter[condition] | |
| if not filtered_results_df.empty: | |
| new_filtered_df_state = filtered_results_df | |
| num_results = len(filtered_results_df) | |
| preview_rows = min(num_results, 5) | |
| preview_cols = min(len(filtered_results_df.columns), 5) | |
| preview_df = filtered_results_df.head(preview_rows).iloc[:, :preview_cols] | |
| preview_str = preview_df.to_string(index=False) | |
| response = ( | |
| f"Found {num_results} items where '{column_name}' {operator} '{value_str}'.\n" | |
| f"Here's a preview:\n```\n{preview_str}\n```\n" | |
| f"The full filtered dataset is now available for download using the 'Download Filtered JSON' button.") | |
| else: | |
| new_filtered_df_state = pd.DataFrame() # Empty dataframe | |
| response = f"No items found where '{column_name}' {operator} '{value_str}'." | |
| except ValueError as ve: | |
| response = f"Invalid value '{value_str}' for numeric column '{column_name}'. {ve}" | |
| new_filtered_df_state = None | |
| logger.warning(f"ValueError during filter: {ve}") | |
| except Exception as e: | |
| new_filtered_df_state = None | |
| response = f"An error occurred while applying the filter: {e}" | |
| logger.error( | |
| f"Error applying filter (column='{column_name}', op='{operator}', val='{value_str}'): {e}") | |
| elif "output as csv" in lower_message or "export as csv" in lower_message: | |
| if df is not None and not df.empty: | |
| csv_output = df.to_csv(index=False) | |
| response = f"Here is the data in CSV format:\n```csv\n{csv_output[:1000]}...\n```\n(Output truncated for chat display)" | |
| else: | |
| response = "There is no data available to output as CSV." | |
| elif "output as json" in lower_message or "export as json" in lower_message: | |
| if df is not None and not df.empty: | |
| json_output = df.to_json(orient='records', indent=2) | |
| response = f"Here is the data in JSON format:\n```json\n{json_output[:1000]}...\n```\n(Output truncated for chat display)" | |
| else: | |
| response = "There is no data available to output as JSON." | |
| if not response: | |
| if "how many items" in lower_message or "number of items" in lower_message: | |
| if new_filtered_df_state is not None and not new_filtered_df_state.empty: | |
| response = f"The currently filtered dataset has {len(new_filtered_df_state)} items. The original dataset has {len(df if df is not None else chatbot_data)} items." | |
| elif df is not None: | |
| response = f"There are {len(df)} top-level items in the processed data." | |
| elif isinstance(chatbot_data, list): | |
| response = f"There are {len(chatbot_data)} top-level items in the processed data (not in DataFrame)." | |
| elif isinstance(chatbot_data, dict): | |
| response = "The processed data is a single dictionary, not a list of items." | |
| else: | |
| response = "The processed data is not a standard list or dictionary structure." | |
| elif "what is the structure" in lower_message or "tell me about the data" in lower_message: | |
| if new_filtered_df_state is not None and not new_filtered_df_state.empty: | |
| response = f"The filtered data has columns: {', '.join(new_filtered_df_state.columns)}. " | |
| if df is not None: | |
| response += f"The original data has columns: {', '.join(df.columns)}." | |
| else: | |
| response += "Original data structure is not tabular." | |
| elif df is not None: | |
| response = f"The data is a table with {len(df)} rows and columns: {', '.join(df.columns)}." | |
| elif isinstance(chatbot_data, list) and chatbot_data: | |
| sample_item = chatbot_data[0] | |
| response = f"The data is a list containing {len(chatbot_data)} items. The first item has the following top-level keys: {list(sample_item.keys())}." | |
| elif isinstance(chatbot_data, dict): | |
| response = f"The data is a dictionary with the following top-level keys: {list(chatbot_data.keys())}." | |
| else: | |
| response = "The processed data is not a standard list or dictionary structure that I can easily describe." | |
| elif "show me" in lower_message or "get me" in lower_message or "extract" in lower_message: | |
| response = "If you want to filter the data, please use a phrase like 'show me items where column_name is value'. If you want to see the raw data, consider using the download buttons." | |
| elif "how can i modify" in lower_message or "how to change" in lower_message or "can i add" in lower_message or "can i remove" in lower_message: | |
| response = "I cannot directly modify the data here, but I can tell you how you *could* modify it. What kind of change are you considering (e.g., adding an item, changing a value, removing a field)?" | |
| elif "add a field" in lower_message or "add a column" in lower_message: | |
| response = "To add a field (or column if the data is tabular), you would typically iterate through each item (or row) in the data and add the new key-value pair. For example, adding a 'status' field with a default value." | |
| elif "change a value" in lower_message or "update a field" in lower_message: | |
| response = "To change a value, you would need to identify the specific item(s) and the field you want to update. You could use a condition (like filtering) to find the right items and then assign a new value to the field." | |
| elif "remove a field" in lower_message or "delete a column" in lower_message: | |
| response = "To remove a field, you would iterate through each item and delete the specified key. Be careful, as this is irreversible." | |
| elif "restructure" in lower_message or "change the format" in lower_message: | |
| response = "Restructuring data involves transforming it into a different shape. This could mean flattening nested objects, grouping items, or pivoting data. This often requires writing custom code to map the old structure to the new one." | |
| elif "what if i" in lower_message or "if i changed" in lower_message: | |
| response = "Tell me what specific change you're contemplating, and I can speculate on the potential impact or how you might approach it programmatically." | |
| elif "hello" in lower_message or "hi" in lower_message: | |
| response = random.choice(["Hello! How can I help you understand the processed data?", | |
| "Hi there! What's on your mind about this data?", | |
| "Hey! Ask me anything about the data you've loaded."]) | |
| elif "thank you" in lower_message or "thanks" in lower_message: | |
| response = random.choice(["You're welcome!", "Glad I could help.", | |
| "No problem! Let me know if you have more questions about the data."]) | |
| elif "clear chat" in lower_message: | |
| chat_history = [] | |
| response = "Chat history cleared." | |
| new_filtered_df_state = None | |
| elif not response: | |
| response = random.choice([ | |
| "I can analyze the data you've processed. What would you like to know? Try asking to filter data, e.g., 'show items where status is active'.", | |
| "Ask me about the number of items, the structure, or values of specific fields. You can also filter data.", | |
| "I can perform basic analysis or filter the data. For example: 'filter by price > 100'.", | |
| "Tell me what you want to extract or filter from the data. Use phrases like 'show items where ...'.", | |
| "I'm equipped to filter your data. Try 'find entries where name contains widget'." | |
| ]) | |
| except Exception as e: | |
| logger.error(f"Chatbot runtime error: {e}") | |
| response = f"An internal error occurred while processing your request: {e}" | |
| response += "\nPlease try rephrasing your question or clear the chat history." | |
| if not response: | |
| response = "I'm not sure how to respond to that. Please try rephrasing or ask for help on available commands." | |
| if chat_history and chat_history[-1][1] == "": | |
| chat_history[-1] = (chat_history[-1][0], response) | |
| return chat_history, chatbot_data, new_filtered_df_state | |
| def create_qr_zip(qr_paths: List[str]) -> Optional[str]: | |
| """Creates a zip archive from a list of QR code image paths.""" | |
| if not qr_paths: | |
| logger.warning("Attempted to create a zip archive, but no QR code paths were provided.") | |
| return None # Return None to prevent Gradio from attempting a download | |
| try: | |
| timestamp = int(time.time()) | |
| zip_filename = f"qr_code_collection_{timestamp}.zip" | |
| zip_filepath = TEMP_DIR / zip_filename | |
| with zipfile.ZipFile(zip_filepath, 'w') as zipf: | |
| for path_str in qr_paths: | |
| path = Path(path_str) | |
| if path.exists(): | |
| # Use path.name to avoid storing the full directory structure in the zip | |
| zipf.write(path, arcname=path.name) | |
| else: | |
| logger.warning(f"QR code file not found, skipping: {path_str}") | |
| logger.info(f"Successfully created QR code zip archive: {zip_filepath}") | |
| return str(zip_filepath) | |
| except Exception as e: | |
| logger.error(f"Failed to create QR code zip archive: {e}") | |
| return None | |
| # --- Gradio Interface Definition --- | |
| def create_modern_interface(): | |
| """Create a modern and visually appealing Gradio interface""" | |
| css = """ | |
| /* Modern color scheme */ | |
| :root { | |
| --primary-color: #1a365d; | |
| --secondary-color: #2d3748; | |
| --accent-color: #4299e1; | |
| --background-color: #f7fafc; | |
| --success-color: #48bb78; | |
| --error-color: #f56565; | |
| --warning-color: #ed8936; | |
| } | |
| /* Component styling */ | |
| .input-container { | |
| background-color: white; | |
| padding: 1.5rem; | |
| border-radius: 0.5rem; | |
| border: 1px solid #e2e8f0; | |
| margin-bottom: 1rem; | |
| } | |
| /* Button styling */ | |
| .primary-button { | |
| background-color: var(--primary-color); | |
| color: white; | |
| padding: 0.75rem 1.5rem; | |
| border-radius: 0.375rem; | |
| border: none; | |
| cursor: pointer; | |
| transition: all 0.2s; | |
| } | |
| .primary-button:hover { | |
| background-color: var(--accent-color); | |
| transform: translateY(-1px); | |
| } | |
| /* Gallery styling */ | |
| .gallery { | |
| display: grid; | |
| grid-template-columns: repeat(auto-fit, minmax(150px, 1fr)); | |
| gap: 1rem; | |
| padding: 1rem; | |
| background-color: white; | |
| border-radius: 0.5rem; | |
| border: 1px solid #e2e8f0; | |
| } | |
| .gallery img { | |
| width: 100%; | |
| height: auto; | |
| border-radius: 0.375rem; | |
| transition: transform 0.2s; | |
| } | |
| .gallery img:hover { | |
| transform: scale(1.05); | |
| } | |
| /* QR Code Viewport Styling */ | |
| .viewport-container { | |
| display: grid; | |
| gap: 0.5rem; | |
| padding: 1rem; | |
| background-color: white; | |
| border-radius: 0.5rem; | |
| border: 1px solid #e2e8f0; | |
| margin-top: 1rem; | |
| } | |
| .viewport-item { | |
| display: flex; | |
| flex-direction: column; | |
| align-items: center; | |
| } | |
| .viewport-item img { | |
| width: 100%; | |
| height: auto; | |
| border-radius: 0.375rem; | |
| transition: transform 0.2s; | |
| max-width: 150px; | |
| max-height: 150px; | |
| } | |
| /* --- NEW: Fullscreen Enhancements --- */ | |
| #fullscreen-viewport-wrapper:fullscreen { | |
| background-color: var(--background-color) !important; | |
| overflow-y: auto; | |
| padding: 2rem; | |
| } | |
| #fullscreen-viewport-wrapper:fullscreen .viewport-container { | |
| grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); | |
| } | |
| #fullscreen-viewport-wrapper:fullscreen .viewport-item img { | |
| max-width: none; | |
| max-height: none; | |
| } | |
| """ | |
| with gr.Blocks(css=css, title="Advanced Data Processor & QR Generator") as interface: | |
| interface.head += """ | |
| <script> | |
| let enabledStates = []; | |
| function updateEnabledStates(checkbox) { | |
| const index = parseInt(checkbox.dataset.index); | |
| if (checkbox.checked) { | |
| if (!enabledStates.includes(index)) { | |
| enabledStates.push(index); | |
| } | |
| } else { | |
| enabledStates = enabledStates.filter(item => item !== index); | |
| } | |
| const enabled_qr_codes_component = document.querySelector('[data-component-type="state"][data-state-name="enabled_qr_codes"]'); | |
| if (enabled_qr_codes_component) { | |
| enabled_qr_codes_component.value = JSON.stringify(enabledStates); | |
| enabled_qr_codes_component.dispatchEvent(new Event('input')); | |
| } | |
| console.log("Enabled QR Code Indices:", enabledStates); | |
| } | |
| function goFullscreen(elementId) { | |
| const elem = document.getElementById(elementId); | |
| if (!elem) return; | |
| if (elem.requestFullscreen) { | |
| elem.requestFullscreen(); | |
| } else if (elem.webkitRequestFullscreen) { /* Safari */ | |
| elem.webkitRequestFullscreen(); | |
| } else if (elem.msRequestFullscreen) { /* IE11 */ | |
| elem.msRequestFullscreen(); | |
| } | |
| } | |
| </script> | |
| """ | |
| qr_code_paths = gr.State([]) | |
| chatbot_data = gr.State(None) | |
| gr.Markdown(""" | |
| # π Advanced Data Processing & QR Code Generator | |
| Transform your data into beautifully designed, sequenced QR codes with our cutting-edge processor. | |
| """) | |
| with gr.Row(): | |
| crawl_depth_slider = gr.Slider( | |
| label="Crawl Depth", | |
| minimum=0, | |
| maximum=10, | |
| value=0, | |
| step=1, | |
| interactive=True, | |
| info="Select the maximum depth for crawling links (0-10)." | |
| ) | |
| with gr.Tab("π URL Processing"): | |
| url_input = gr.Textbox( | |
| label="Enter URLs (comma or newline separated)", | |
| lines=5, | |
| placeholder="https://example1.com\nhttps://example2.com", | |
| value="" | |
| ) | |
| with gr.Tab("π File Input"): | |
| file_input = gr.File( | |
| label="Upload Files", | |
| file_types=None, | |
| file_count="multiple" | |
| ) | |
| with gr.Tab("π JSON Input"): | |
| text_input = gr.TextArea( | |
| label="Direct JSON Input", | |
| lines=15, | |
| placeholder="Paste your JSON data here...", | |
| value="" | |
| ) | |
| with gr.Row(): | |
| example_btn = gr.Button("π Load Example", variant="secondary") | |
| clear_btn = gr.Button("ποΈ Clear", variant="secondary") | |
| with gr.Row(): | |
| combine_data = gr.Checkbox( | |
| label="Combine all data into sequence", | |
| value=True, | |
| info="Generate sequential QR codes for combined data" | |
| ) | |
| generate_qr_toggle = gr.Checkbox( | |
| label="Generate QR Codes", | |
| value=False, | |
| info="Enable to generate QR codes for the processed data." | |
| ) | |
| process_btn = gr.Button( | |
| "π Process & Generate QR", | |
| variant="primary" | |
| ) | |
| # --- NEW: Two-Column Output Layout --- | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| output_json = gr.JSON(label="Processed Data") | |
| with gr.Column(scale=1): | |
| output_gallery = gr.Gallery( | |
| label="Generated QR Codes", | |
| columns=None, | |
| height="auto", | |
| show_label=True, | |
| elem_classes=["gallery"] | |
| ) | |
| download_qrs_btn = gr.Button("β¬οΈ Download All QR Codes as ZIP") | |
| qr_zip_output = gr.File(label="Download QR Code ZIP", interactive=False) | |
| output_text = gr.Textbox( | |
| label="Processing Status", | |
| interactive=False, | |
| lines=8 | |
| ) | |
| # --- End of New Layout --- | |
| with gr.Tab("πΌοΈ QR Code Viewport") as viewport_tab: | |
| viewport_output = gr.HTML(label="QR Code Sequence Viewport") | |
| enabled_qr_codes = gr.State([]) | |
| with gr.Tab("π€ Chat with Data") as chat_tab: | |
| chat_history = gr.State([]) | |
| chatbot = gr.Chatbot(label="Data Chatbot", height=500) | |
| filtered_chatbot_df_state = gr.State(None) # To store the filtered DataFrame | |
| with gr.Row(): | |
| chat_input = gr.Textbox(label="Your Message", placeholder="Ask me about the processed data...") | |
| send_msg_btn = gr.Button("Send") | |
| with gr.Row(): | |
| download_full_json_btn = gr.Button("Download Full JSON") | |
| download_filtered_json_btn = gr.Button("Download Filtered JSON") | |
| download_file_output = gr.File(label="Download Data", interactive=False) | |
| clear_chat_btn = gr.Button("Clear Chat History") | |
| def load_example(): | |
| example = { | |
| "type": "product_catalog", | |
| "items": [ | |
| { | |
| "id": "123", | |
| "name": "Premium Widget", | |
| "description": "High-quality widget with advanced features", | |
| "price": 299.99, | |
| "category": "electronics", | |
| "tags": ["premium", "featured", "new"] | |
| }, | |
| { | |
| "id": "456", | |
| "name": "Basic Widget", | |
| "description": "Reliable widget for everyday use", | |
| "price": 149.99, | |
| "category": "electronics", | |
| "tags": ["basic", "popular"] | |
| } | |
| ], | |
| "metadata": { | |
| "timestamp": datetime.now().isoformat(), | |
| "version": "2.0", | |
| "source": "example" | |
| } | |
| } | |
| return json.dumps(example, indent=2) | |
| def clear_input(): | |
| return "", None, "", None | |
| def update_viewport(paths, enabled_states): | |
| if not paths: | |
| return "<p>No QR codes generated yet.</p>" | |
| # Wrapper div with an ID for fullscreen targeting | |
| html_content = '<div id="fullscreen-viewport-wrapper" style="padding:1rem; border: 1px solid #ddd; border-radius: 0.5rem;">' | |
| # Fullscreen button | |
| html_content += '<button onclick="goFullscreen(\'fullscreen-viewport-wrapper\')" class="primary-button" style="margin-bottom: 1rem;">View Fullscreen</button>' | |
| num_qr_codes = len(paths) | |
| cols = math.ceil(math.sqrt(num_qr_codes)) | |
| cols = max(1, min(cols, 8)) | |
| html_content += f'<div class="viewport-container" style="grid-template-columns: repeat({cols}, 1fr);">' | |
| if enabled_states is None or len(enabled_states) != num_qr_codes or not enabled_states: | |
| enabled_states = list(range(num_qr_codes)) | |
| for i, path in enumerate(paths): | |
| is_enabled = i in enabled_states | |
| border = "border: 2px solid var(--success-color);" if is_enabled else "border: 2px solid #ccc;" | |
| opacity = "opacity: 1.0;" if is_enabled else "opacity: 0.5;" | |
| html_content += f'<div class="viewport-item" id="qr_item_{i}">' | |
| html_content += f'<img src="/file={path}" style="{border} {opacity}" alt="QR Code {i + 1}">' | |
| html_content += f'<label style="font-size: 0.8em; margin-top: 4px;"><input type="checkbox" data-index="{i}" {"checked" if is_enabled else ""} onchange="updateEnabledStates(this)"> Enable</label>' | |
| html_content += '</div>' | |
| html_content += '</div>' | |
| html_content += '</div>' | |
| return html_content | |
| def process_inputs(urls, files, text, combine, crawl_depth, generate_qr_enabled): | |
| """Process all inputs and generate QR codes based on toggle""" | |
| results = [] | |
| processing_status_messages = [] | |
| url_processor = EnhancedURLProcessor() | |
| file_processor = EnhancedFileProcessor() | |
| try: | |
| if text and text.strip(): | |
| try: | |
| json_data = json.loads(text) | |
| results.append({ | |
| 'source': 'json_input', | |
| 'extracted_data': json_data, | |
| 'timestamp': datetime.now().isoformat(), | |
| 'processing_notes': ['Parsed from direct JSON input.'] | |
| }) | |
| processing_status_messages.append("β Successfully parsed direct JSON input.") | |
| except json.JSONDecodeError as e: | |
| processing_status_messages.append(f"β Invalid JSON format in text input: {str(e)}") | |
| except Exception as e: | |
| processing_status_messages.append(f"β Error processing direct JSON input: {str(e)}") | |
| if urls and urls.strip(): | |
| url_list = re.split(r'[,\n]', urls) | |
| url_list = [url.strip() for url in url_list if url.strip()] | |
| for url in url_list: | |
| processing_status_messages.append( | |
| f"π Processing URL: {url} with crawl depth {crawl_depth}...") | |
| content_result = url_processor.fetch_content_with_depth(url, max_steps=crawl_depth) | |
| if content_result: | |
| results.append(content_result) | |
| if content_result.get('fetch_result') is not None: | |
| processing_status_messages.append(f"β Processed URL: {url} (Level 0)") | |
| if content_result.get('processing_notes'): | |
| processing_status_messages.append( | |
| f" Notes: {'; '.join(content_result['processing_notes'])}") | |
| if content_result.get('linked_extractions'): | |
| num_linked_processed = len([r for r in content_result['linked_extractions'] if | |
| r and r.get('fetch_result') is not None]) | |
| processing_status_messages.append( | |
| f" Found and processed {num_linked_processed}/{len(content_result['linked_extractions'])} direct links.") | |
| else: | |
| processing_status_messages.append(f"β Failed to process URL: {url}") | |
| if content_result.get('processing_notes'): | |
| processing_status_messages.append( | |
| f" Notes: {'; '.join(content_result['processing_notes'])}") | |
| else: | |
| processing_status_messages.append( | |
| f"β Failed to process URL: {url} (No result returned)") | |
| if files: | |
| for file in files: | |
| processing_status_messages.append(f"π Processing file: {file.name}...") | |
| file_results = file_processor.process_file(file) | |
| if file_results: | |
| results.extend(file_results) | |
| processing_status_messages.append(f"β Processed file: {file.name}") | |
| for res in file_results: | |
| if res.get('processing_notes'): | |
| processing_status_messages.append( | |
| f" Notes for {res.get('filename', 'item')}: {'; '.join(res['processing_notes'])}") | |
| else: | |
| processing_status_messages.append(f"β Failed to process file: {file.name}") | |
| qr_paths = [] | |
| final_json_output = None | |
| if results: | |
| final_json_output = results | |
| if generate_qr_enabled: | |
| processing_status_messages.append("βοΈ Generating QR codes as requested...") | |
| qr_paths = generate_qr_codes(results, combine) | |
| if qr_paths: | |
| processing_status_messages.append(f"β Successfully generated {len(qr_paths)} QR codes.") | |
| else: | |
| processing_status_messages.append( | |
| "β Failed to generate QR codes (empty result or error). Check logs for details.)") | |
| else: | |
| processing_status_messages.append( | |
| "βοΈ QR code generation was disabled. Processed data is available.") | |
| else: | |
| processing_status_messages.append("β οΈ No valid content collected from inputs.") | |
| final_json_output = {} | |
| except Exception as e: | |
| logger.error(f"Overall processing error in process_inputs: {e}") | |
| processing_status_messages.append(f"β An unexpected error occurred during processing: {str(e)}") | |
| return ( | |
| final_json_output, | |
| [str(path) for path in qr_paths], | |
| "\n".join(processing_status_messages), | |
| final_json_output, | |
| None | |
| ) | |
| def on_qr_generation(qr_paths_list): | |
| num_qrs = len(qr_paths_list) if qr_paths_list is not None else 0 | |
| initial_enabled_states = list(range(num_qrs)) | |
| return qr_paths_list, initial_enabled_states | |
| # Event Handlers | |
| example_btn.click(load_example, inputs=[], outputs=text_input) | |
| clear_btn.click(clear_input, inputs=[], outputs=[url_input, file_input, text_input, chatbot_data]) | |
| process_btn.click( | |
| process_inputs, | |
| inputs=[url_input, file_input, text_input, combine_data, crawl_depth_slider, generate_qr_toggle], | |
| outputs=[output_json, output_gallery, output_text, chatbot_data, qr_zip_output] | |
| ).then( | |
| on_qr_generation, | |
| inputs=[output_gallery], | |
| outputs=[qr_code_paths, enabled_qr_codes] | |
| ) | |
| download_qrs_btn.click( | |
| fn=create_qr_zip, | |
| inputs=[qr_code_paths], | |
| outputs=[qr_zip_output] | |
| ) | |
| viewport_tab.select(update_viewport, inputs=[qr_code_paths, enabled_qr_codes], outputs=[viewport_output]) | |
| send_msg_btn.click( | |
| respond_to_chat, | |
| inputs=[chat_input, chat_history, chatbot_data, filtered_chatbot_df_state], | |
| outputs=[chatbot, chatbot_data, filtered_chatbot_df_state] | |
| ).then( | |
| lambda: "", | |
| inputs=None, | |
| outputs=chat_input | |
| ) | |
| chat_input.submit( | |
| respond_to_chat, | |
| inputs=[chat_input, chat_history, chatbot_data, filtered_chatbot_df_state], | |
| outputs=[chatbot, chatbot_data, filtered_chatbot_df_state] | |
| ).then( | |
| lambda: "", | |
| inputs=None, | |
| outputs=chat_input | |
| ) | |
| clear_chat_btn.click( | |
| lambda: ([], None, []), | |
| inputs=None, | |
| outputs=[chatbot, filtered_chatbot_df_state, chat_history] | |
| ) | |
| def download_json_data(data: Optional[Union[pd.DataFrame, List[Dict]]], filename_prefix: str) -> Optional[str]: | |
| if data is None: | |
| logger.info(f"No data provided for download with prefix '{filename_prefix}'.") | |
| return None | |
| data_to_dump = None | |
| if isinstance(data, pd.DataFrame): | |
| if data.empty: | |
| logger.info(f"DataFrame for '{filename_prefix}' is empty. Nothing to download.") | |
| return None | |
| data_to_dump = data.to_dict(orient='records') | |
| elif isinstance(data, list): | |
| if not data: | |
| logger.info(f"List for '{filename_prefix}' is empty. Nothing to download.") | |
| return None | |
| data_to_dump = data | |
| if data_to_dump is None: | |
| return None | |
| try: | |
| json_str = json.dumps(data_to_dump, indent=2, ensure_ascii=False) | |
| timestamp = int(time.time()) | |
| filename = f"{filename_prefix}_{timestamp}.json" | |
| file_path = TEMP_DIR / filename | |
| with open(file_path, 'w', encoding='utf-8') as f: | |
| f.write(json_str) | |
| logger.info(f"Successfully created JSON file for download: {file_path}") | |
| return str(file_path) | |
| except Exception as e: | |
| logger.error(f"Error creating JSON file for {filename_prefix}: {e}") | |
| return None | |
| def handle_download_full_json(current_chatbot_data_state: Optional[List[Dict]]) -> Optional[str]: | |
| if not current_chatbot_data_state: | |
| logger.info("No full data available to download.") | |
| gr.Warning("No data has been processed yet!") | |
| return None | |
| return download_json_data(current_chatbot_data_state, "full_data_collection") | |
| def handle_download_filtered_json(current_filtered_df_state: Optional[pd.DataFrame]) -> Optional[str]: | |
| if current_filtered_df_state is None or current_filtered_df_state.empty: | |
| logger.info("No filtered data available to download.") | |
| gr.Warning("No filtered data to download. Please filter data in the chat first.") | |
| return None | |
| return download_json_data(current_filtered_df_state, "filtered_data") | |
| download_full_json_btn.click( | |
| fn=handle_download_full_json, | |
| inputs=[chatbot_data], | |
| outputs=[download_file_output] | |
| ) | |
| download_filtered_json_btn.click( | |
| fn=handle_download_filtered_json, | |
| inputs=[filtered_chatbot_df_state], | |
| outputs=[download_file_output] | |
| ) | |
| gr.Markdown(""" | |
| ### π Features | |
| - **Enhanced URL Scraping**: Extracts HTML text, title, meta description, links, and attempts parsing JSON/XML from URLs based on content type. Supports crawling links up to a specified depth. | |
| - **Advanced File Processing**: Reads various text-based files, HTML, XML, CSV, and attempts text extraction from common documents (.pdf, .docx, .rtf, .odt). | |
| - **Archive Support**: Extracts and processes supported files from .zip, .tar, .gz archives. | |
| - **Data Chatbot**: Interact conversationally with the processed JSON data to ask questions, filter, and get insights. | |
| - **Sequential QR Codes**: Chunks large data and embeds sequencing info for reconstruction. | |
| - **QR Code Viewport**: A dedicated tab with a **fullscreen mode** for viewing the entire QR code collection. | |
| - **Bulk Download**: Download all generated QR codes as a single ZIP file. | |
| ### π‘ Tips | |
| 1. **Layout**: The output is split into two columns: raw JSON on the left, and the QR Code gallery + status log on the right. This prevents the status log from hiding the QR codes. | |
| 2. **Fullscreen**: For the best viewing experience of all QR codes, navigate to the **"QR Code Viewport"** tab and click the **"View Fullscreen"** button. | |
| 3. **Download**: Use the **"Download All QR Codes as ZIP"** button located directly under the QR code gallery to save all images at once. | |
| """) | |
| return interface | |
| def main(): | |
| """Initialize and launch the application""" | |
| try: | |
| mimetypes.init() | |
| interface = create_modern_interface() | |
| interface.launch( | |
| share=False, | |
| debug=False, | |
| show_error=True, | |
| show_api=False | |
| ) | |
| except Exception as e: | |
| logger.error(f"Application startup error: {e}") | |
| print(f"\nFatal Error: {e}\nCheck the logs for details.") | |
| raise | |
| if __name__ == "__main__": | |
| main() |