Spaces:
Runtime error
Runtime error
| import json | |
| import os | |
| import re | |
| import time | |
| import logging | |
| import mimetypes | |
| import concurrent.futures | |
| import string | |
| import zipfile | |
| import tempfile | |
| from datetime import datetime | |
| from typing import List, Dict, Optional, Union | |
| from pathlib import Path | |
| from urllib.parse import urlparse | |
| import requests | |
| import validators | |
| import gradio as gr | |
| from diskcache import Cache | |
| from bs4 import BeautifulSoup | |
| from fake_useragent import UserAgent | |
| from ratelimit import limits, sleep_and_retry | |
| from cleantext import clean | |
| import qrcode | |
| # Setup logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s', | |
| handlers=[ | |
| logging.StreamHandler(), | |
| logging.FileHandler('app.log', encoding='utf-8') | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Ensure output directories exist | |
| Path('output/qr_codes').mkdir(parents=True, exist_ok=True) | |
| class URLProcessor: | |
| def __init__(self): | |
| self.session = requests.Session() | |
| self.timeout = 10 # seconds | |
| self.session.headers.update({ | |
| 'User-Agent': UserAgent().random, | |
| 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', | |
| 'Accept-Language': 'en-US,en;q=0.5', | |
| 'Accept-Encoding': 'gzip, deflate, br', | |
| 'Connection': 'keep-alive', | |
| 'Upgrade-Insecure-Requests': '1' | |
| }) | |
| def advanced_text_cleaning(self, text: str) -> str: | |
| """Robust text cleaning with version compatibility""" | |
| try: | |
| cleaned_text = clean( | |
| text, | |
| fix_unicode=True, | |
| to_ascii=True, | |
| lower=True, | |
| no_line_breaks=True, | |
| no_urls=True, | |
| no_emails=True, | |
| no_phone_numbers=True, | |
| no_numbers=False, | |
| no_digits=False, | |
| no_currency_symbols=True, | |
| no_punct=False | |
| ).strip() | |
| return cleaned_text | |
| except Exception as e: | |
| logger.warning(f"Text cleaning error: {e}. Using fallback method.") | |
| text = re.sub(r'[\x00-\x1F\x7F-\x9F]', '', text) # Remove control characters | |
| text = text.encode('ascii', 'ignore').decode('ascii') # Remove non-ASCII characters | |
| text = re.sub(r'\s+', ' ', text) # Normalize whitespace | |
| return text.strip() | |
| def validate_url(self, url: str) -> Dict: | |
| """Validate URL format and accessibility""" | |
| try: | |
| if not validators.url(url): | |
| return {'is_valid': False, 'message': 'Invalid URL format'} | |
| response = self.session.head(url, timeout=self.timeout) | |
| response.raise_for_status() | |
| return {'is_valid': True, 'message': 'URL is valid and accessible'} | |
| except Exception as e: | |
| return {'is_valid': False, 'message': f'URL validation failed: {str(e)}'} | |
| def fetch_content(self, url: str) -> Optional[Dict]: | |
| """Universal content fetcher with special case handling""" | |
| try: | |
| # Google Drive document handling | |
| if 'drive.google.com' in url: | |
| return self._handle_google_drive(url) | |
| # Google Calendar ICS handling | |
| if 'calendar.google.com' in url and 'ical' in url: | |
| return self._handle_google_calendar(url) | |
| # Standard HTML processing | |
| return self._fetch_html_content(url) | |
| except Exception as e: | |
| logger.error(f"Content fetch failed: {e}") | |
| return None | |
| def _handle_google_drive(self, url: str) -> Optional[Dict]: | |
| """Process Google Drive file links""" | |
| try: | |
| file_id = re.search(r'/file/d/([a-zA-Z0-9_-]+)', url) | |
| if not file_id: | |
| logger.error(f"Invalid Google Drive URL: {url}") | |
| return None | |
| direct_url = f"https://drive.google.com/uc?export=download&id={file_id.group(1)}" | |
| response = self.session.get(direct_url, timeout=self.timeout) | |
| response.raise_for_status() | |
| return { | |
| 'content': response.text, | |
| 'content_type': response.headers.get('Content-Type', ''), | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Google Drive processing failed: {e}") | |
| return None | |
| def _handle_google_calendar(self, url: str) -> Optional[Dict]: | |
| """Process Google Calendar ICS feeds""" | |
| try: | |
| response = self.session.get(url, timeout=self.timeout) | |
| response.raise_for_status() | |
| return { | |
| 'content': response.text, | |
| 'content_type': 'text/calendar', | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Calendar fetch failed: {e}") | |
| return None | |
| def _fetch_html_content(self, url: str) -> Optional[Dict]: | |
| """Standard HTML content processing""" | |
| try: | |
| response = self.session.get(url, timeout=self.timeout) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| # Remove unwanted elements | |
| for element in soup(['script', 'style', 'nav', 'footer', 'header', 'meta', 'link']): | |
| element.decompose() | |
| # Extract main content | |
| main_content = soup.find('main') or soup.find('article') or soup.body | |
| if main_content is None: | |
| logger.warning(f"No main content found for URL: {url}") | |
| return { | |
| 'content': '', | |
| 'content_type': response.headers.get('Content-Type', ''), | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| # Clean and structure content | |
| text_content = main_content.get_text(separator='\n', strip=True) | |
| cleaned_content = self.advanced_text_cleaning(text_content) | |
| return { | |
| 'content': cleaned_content, | |
| 'content_type': response.headers.get('Content-Type', ''), | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"HTML processing failed: {e}") | |
| return None | |
| class FileProcessor: | |
| """Class to handle file processing""" | |
| def __init__(self, max_file_size: int = 2 * 1024 * 1024 * 1024): # 2GB default | |
| self.max_file_size = max_file_size | |
| self.supported_text_extensions = {'.txt', '.md', '.csv', '.json', '.xml'} | |
| def is_text_file(self, filepath: str) -> bool: | |
| """Check if file is a text file""" | |
| try: | |
| mime_type, _ = mimetypes.guess_type(filepath) | |
| return (mime_type and mime_type.startswith('text/')) or \ | |
| (os.path.splitext(filepath)[1].lower() in self.supported_text_extensions) | |
| except Exception: | |
| return False | |
| def process_file(self, file) -> List[Dict]: | |
| """Process uploaded file with enhanced error handling""" | |
| if not file: | |
| return [] | |
| dataset = [] | |
| try: | |
| file_size = os.path.getsize(file.name) | |
| if file_size > self.max_file_size: | |
| logger.warning(f"File size ({file_size} bytes) exceeds maximum allowed size") | |
| return [] | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| if zipfile.is_zipfile(file.name): | |
| dataset.extend(self._process_zip_file(file.name, temp_dir)) | |
| else: | |
| dataset.extend(self._process_single_file(file)) | |
| except Exception as e: | |
| logger.error(f"Error processing file: {str(e)}") | |
| return [] | |
| return dataset | |
| def _process_zip_file(self, zip_path: str, temp_dir: str) -> List[Dict]: | |
| """Process ZIP file contents""" | |
| results = [] | |
| with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
| zip_ref.extractall(temp_dir) | |
| for root, _, files in os.walk(temp_dir): | |
| for filename in files: | |
| filepath = os.path.join(root, filename) | |
| if self.is_text_file(filepath): | |
| try: | |
| with open(filepath, 'r', encoding='utf-8', errors='ignore') as f: | |
| content = f.read() | |
| if content.strip(): | |
| results.append({ | |
| "source": "file", | |
| "filename": filename, | |
| "content": content, | |
| "timestamp": datetime.now().isoformat() | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error reading file {filename}: {str(e)}") | |
| return results | |
| def _process_single_file(self, file) -> List[Dict]: | |
| """Process a single file""" | |
| try: | |
| file_stat = os.stat(file.name) | |
| # For very large files, read in chunks and summarize | |
| if file_stat.st_size > 100 * 1024 * 1024: # 100MB | |
| logger.info(f"Processing large file: {file.name} ({file_stat.st_size} bytes)") | |
| # Read first and last 1MB for extremely large files | |
| content = "" | |
| with open(file.name, 'r', encoding='utf-8', errors='ignore') as f: | |
| content = f.read(1 * 1024 * 1024) # First 1MB | |
| content += "\n...[Content truncated due to large file size]...\n" | |
| # Seek to the last 1MB | |
| f.seek(max(0, file_stat.st_size - 1 * 1024 * 1024)) | |
| content += f.read() # Last 1MB | |
| else: | |
| # Regular file processing | |
| with open(file.name, 'r', encoding='utf-8', errors='ignore') as f: | |
| content = f.read() | |
| return [{ | |
| 'source': 'file', | |
| 'filename': os.path.basename(file.name), | |
| 'file_size': file_stat.st_size, | |
| 'mime_type': mimetypes.guess_type(file.name)[0], | |
| 'created': datetime.fromtimestamp(file_stat.st_ctime).isoformat(), | |
| 'modified': datetime.fromtimestamp(file_stat.st_mtime).isoformat(), | |
| 'content': content, | |
| 'timestamp': datetime.now().isoformat() | |
| }] | |
| except Exception as e: | |
| logger.error(f"File processing error: {e}") | |
| return [] | |
| def clean_json(data: Union[str, Dict]) -> Optional[Dict]: | |
| """Clean and validate JSON data""" | |
| try: | |
| # If it's a string, try to parse it | |
| if isinstance(data, str): | |
| # Remove any existing content and extra whitespace | |
| data = data.strip() | |
| data = json.loads(data) | |
| # Convert to string and back to ensure proper JSON format | |
| cleaned = json.loads(json.dumps(data)) | |
| return cleaned | |
| except json.JSONDecodeError as e: | |
| logger.error(f"JSON cleaning error: {e}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Unexpected error while cleaning JSON: {e}") | |
| return None | |
| def generate_qr_code(data: Union[str, Dict], combined: bool = True) -> List[str]: | |
| """Generate QR code(s) from data""" | |
| try: | |
| output_dir = Path('output/qr_codes') | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| if combined: | |
| # Generate single QR code for all data | |
| cleaned_data = clean_json(data) | |
| if cleaned_data: | |
| qr = qrcode.QRCode( | |
| version=None, | |
| error_correction=qrcode.constants.ERROR_CORRECT_L, | |
| box_size=10, | |
| border=4, | |
| ) | |
| json_str = json.dumps(cleaned_data, ensure_ascii=False) | |
| qr.add_data(json_str) | |
| qr.make(fit=True) | |
| img = qr.make_image(fill_color="black", back_color="white") | |
| output_path = output_dir / f'combined_qr_{int(time.time())}.png' | |
| img.save(str(output_path)) | |
| return [str(output_path)] | |
| else: | |
| # Generate separate QR codes for each item | |
| if isinstance(data, list): | |
| paths = [] | |
| for idx, item in enumerate(data): | |
| cleaned_item = clean_json(item) | |
| if cleaned_item: | |
| qr = qrcode.QRCode( | |
| version=None, | |
| error_correction=qrcode.constants.ERROR_CORRECT_L, | |
| box_size=10, | |
| border=4, | |
| ) | |
| json_str = json.dumps(cleaned_item, ensure_ascii=False) | |
| qr.add_data(json_str) | |
| qr.make(fit=True) | |
| img = qr.make_image(fill_color="black", back_color="white") | |
| output_path = output_dir / f'item_{idx}_qr_{int(time.time())}.png' | |
| img.save(str(output_path)) | |
| paths.append(str(output_path)) | |
| return paths | |
| else: | |
| # Single item, not combined | |
| cleaned_item = clean_json(data) | |
| if cleaned_item: | |
| qr = qrcode.QRCode( | |
| version=None, | |
| error_correction=qrcode.constants.ERROR_CORRECT_L, | |
| box_size=10, | |
| border=4, | |
| ) | |
| json_str = json.dumps(cleaned_item, ensure_ascii=False) | |
| qr.add_data(json_str) | |
| qr.make(fit=True) | |
| img = qr.make_image(fill_color="black", back_color="white") | |
| output_path = output_dir / f'single_qr_{int(time.time())}.png' | |
| img.save(str(output_path)) | |
| return [str(output_path)] | |
| return [] | |
| except Exception as e: | |
| logger.error(f"QR generation error: {e}") | |
| return [] | |
| def create_interface(): | |
| """Create a comprehensive Gradio interface with advanced features""" | |
| css = """ | |
| .container { max-width: 1200px; margin: auto; } | |
| .warning { background-color: #fff3cd; color: #856404; padding: 10px; border-radius: 4px; } | |
| .error { background-color: #f8d7da; color: #721c24; padding: 10px; border-radius: 4px; } | |
| .success { background-color: #d4edda; color: #155724; padding: 10px; border-radius: 4px; } | |
| """ | |
| with gr.Blocks(css=css, title="Advanced Data Processor & QR Generator") as interface: | |
| gr.Markdown("# π Advanced Data Processing & QR Code Generator") | |
| with gr.Tab("URL Processing"): | |
| url_input = gr.Textbox( | |
| label="Enter URLs (comma or newline separated)", | |
| lines=5, | |
| placeholder="https://example1.com\nhttps://example2.com", | |
| value="" | |
| ) | |
| with gr.Tab("File Input"): | |
| file_input = gr.File( | |
| label="Upload text file or ZIP archive", | |
| file_types=[".txt", ".zip", ".md", ".csv", ".json", ".xml"] | |
| ) | |
| with gr.Tab("Notepad"): | |
| text_input = gr.TextArea( | |
| label="JSON Data Input", | |
| lines=15, | |
| placeholder="Paste your JSON data here...", | |
| value="" | |
| ) | |
| with gr.Row(): | |
| example_btn = gr.Button("π Load Example JSON", variant="secondary") | |
| clear_btn = gr.Button("ποΈ Clear Input", variant="secondary") | |
| with gr.Row(): | |
| combine_data = gr.Checkbox( | |
| label="Combine all data into single QR code", | |
| value=True, | |
| info="Generate one QR code for all data, or separate QR codes for each item" | |
| ) | |
| process_btn = gr.Button("π Process & Generate QR", variant="primary", scale=2) | |
| output_json = gr.JSON(label="Processed JSON Data") | |
| output_gallery = gr.Gallery(label="Generated QR Codes", columns=2, height=400) | |
| output_text = gr.Textbox(label="Processing Status", interactive=False) | |
| def load_example(): | |
| example_json = { | |
| "type": "product_catalog", | |
| "items": [ | |
| { | |
| "id": "123", | |
| "name": "Test Product", | |
| "description": "This is a test product description", | |
| "price": 29.99, | |
| "category": "electronics", | |
| "tags": ["test", "sample", "demo"] | |
| }, | |
| { | |
| "id": "456", | |
| "name": "Another Product", | |
| "description": "Another test product description", | |
| "price": 49.99, | |
| "category": "accessories", | |
| "tags": ["sample", "test"] | |
| } | |
| ], | |
| "metadata": { | |
| "timestamp": datetime.now().isoformat(), | |
| "version": "1.0", | |
| "source": "example" | |
| } | |
| } | |
| return json.dumps(example_json, indent=2) | |
| def clear_input(): | |
| return "" | |
| def process_all_inputs(urls, file, text, combine): | |
| """Process all input types and generate QR codes""" | |
| try: | |
| results = [] | |
| # Process text input first (since it's direct JSON) | |
| if text and text.strip(): | |
| try: | |
| # Try to parse as JSON | |
| json_data = json.loads(text) | |
| if isinstance(json_data, list): | |
| results.extend(json_data) | |
| else: | |
| results.append(json_data) | |
| except json.JSONDecodeError as e: | |
| return None, [], f"β Invalid JSON format: {str(e)}" | |
| # Process URLs if provided | |
| if urls and urls.strip(): | |
| processor = URLProcessor() | |
| url_list = re.split(r'[,\n]', urls) | |
| url_list = [url.strip() for url in url_list if url.strip()] | |
| for url in url_list: | |
| validation = processor.validate_url(url) | |
| if validation.get('is_valid'): | |
| content = processor.fetch_content(url) | |
| if content: | |
| results.append({ | |
| 'source': 'url', | |
| 'url': url, | |
| 'content': content, | |
| 'timestamp': datetime.now().isoformat() | |
| }) | |
| # Process files if provided | |
| if file: | |
| file_processor = FileProcessor() | |
| file_results = file_processor.process_file(file) | |
| if file_results: | |
| results.extend(file_results) | |
| # Generate QR codes | |
| if results: | |
| qr_paths = generate_qr_code(results, combined=combine) | |
| if qr_paths: | |
| return ( | |
| results, | |
| [str(path) for path in qr_paths], | |
| f"β Successfully processed {len(results)} items and generated {len(qr_paths)} QR code(s)!" | |
| ) | |
| else: | |
| return None, [], "β Failed to generate QR codes. Please check the input data." | |
| else: | |
| return None, [], "β οΈ No valid content to process. Please provide some input data." | |
| except Exception as e: | |
| logger.error(f"Processing error: {e}") | |
| return None, [], f"β Error: {str(e)}" | |
| # Set up event handlers | |
| example_btn.click(load_example, outputs=[text_input]) | |
| clear_btn.click(clear_input, outputs=[text_input]) | |
| process_btn.click( | |
| process_all_inputs, | |
| inputs=[url_input, file_input, text_input, combine_data], | |
| outputs=[output_json, output_gallery, output_text] | |
| ) | |
| gr.Markdown(""" | |
| ### Features | |
| - **URL Processing**: Extract content from websites | |
| - **File Processing**: Handle text files and archives | |
| - **Notepad**: Direct JSON data input/manipulation | |
| - **JSON Cleaning**: Automatic JSON validation and formatting | |
| - **QR Generation**: Generate QR codes with embedded JSON data | |
| - **Flexible Output**: Choose between combined or separate QR codes | |
| ### Usage Tips | |
| 1. Use the **Notepad** tab for direct JSON input | |
| 2. Click "Load Example JSON" to see a sample format | |
| 3. Choose whether to combine all data into a single QR code | |
| 4. The generated QR codes will contain the complete JSON data | |
| """) | |
| return interface | |
| def main(): | |
| # Configure system settings | |
| mimetypes.init() | |
| # Create output directories | |
| Path('output/qr_codes').mkdir(parents=True, exist_ok=True) | |
| # Create and launch interface | |
| interface = create_interface() | |
| # Launch with proper configuration | |
| interface.launch( | |
| server_name="0.0.0.0", | |
| server_port=8000, | |
| show_error=True, | |
| share=False, | |
| inbrowser=True, | |
| debug=True | |
| ) | |
| if __name__ == "__main__": | |
| main() |