Spaces:
Runtime error
Runtime error
| # Plaintext_Files.py | |
| # Description: This file contains functions for reading and writing plaintext files. | |
| # | |
| # Import necessary libraries | |
| import logging | |
| import os | |
| import tempfile | |
| import zipfile | |
| from datetime import datetime | |
| # | |
| # External Imports | |
| from docx2txt import docx2txt | |
| from pypandoc import convert_file | |
| # | |
| # Local Imports | |
| from App_Function_Libraries.Gradio_UI.Import_Functionality import import_data | |
| from App_Function_Libraries.Metrics.metrics_logger import log_counter, log_histogram | |
| # | |
| ####################################################################################################################### | |
| # | |
| # Function Definitions | |
| def import_plain_text_file(file_path, author, keywords, system_prompt, user_prompt, auto_summarize, api_name, api_key): | |
| """Import a single plain text file.""" | |
| try: | |
| log_counter("file_processing_attempt", labels={"file_path": file_path}) | |
| # Extract title from filename | |
| title = os.path.splitext(os.path.basename(file_path))[0] | |
| # Determine the file type and convert if necessary | |
| file_extension = os.path.splitext(file_path)[1].lower() | |
| # Get the content based on file type | |
| try: | |
| if file_extension == '.rtf': | |
| with tempfile.NamedTemporaryFile(suffix='.md', delete=False) as temp_file: | |
| convert_file(file_path, 'md', outputfile=temp_file.name) | |
| file_path = temp_file.name | |
| with open(file_path, 'r', encoding='utf-8') as file: | |
| content = file.read() | |
| log_counter("rtf_conversion_success", labels={"file_path": file_path}) | |
| elif file_extension == '.docx': | |
| content = docx2txt.process(file_path) | |
| log_counter("docx_conversion_success", labels={"file_path": file_path}) | |
| else: | |
| with open(file_path, 'r', encoding='utf-8') as file: | |
| content = file.read() | |
| except Exception as e: | |
| logging.error(f"Error reading file content: {str(e)}") | |
| return f"Error reading file content: {str(e)}" | |
| # Import the content | |
| result = import_data( | |
| content, # Pass the content directly | |
| title, | |
| author, | |
| keywords, | |
| user_prompt, # This is the custom_prompt parameter | |
| None, # No summary - let auto_summarize handle it | |
| auto_summarize, | |
| api_name, | |
| api_key | |
| ) | |
| log_counter("file_processing_success", labels={"file_path": file_path}) | |
| return result | |
| except Exception as e: | |
| logging.exception(f"Error processing file {file_path}") | |
| log_counter("file_processing_error", labels={"file_path": file_path, "error": str(e)}) | |
| return f"Error processing file {os.path.basename(file_path)}: {str(e)}" | |
| def process_plain_text_zip_file(zip_file, author, keywords, system_prompt, user_prompt, auto_summarize, api_name, api_key): | |
| """Process multiple text files from a zip archive.""" | |
| results = [] | |
| try: | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| with zipfile.ZipFile(zip_file.name, 'r') as zip_ref: | |
| zip_ref.extractall(temp_dir) | |
| for filename in os.listdir(temp_dir): | |
| if filename.lower().endswith(('.md', '.txt', '.rtf', '.docx')): | |
| file_path = os.path.join(temp_dir, filename) | |
| result = import_plain_text_file( | |
| file_path=file_path, | |
| author=author, | |
| keywords=keywords, | |
| system_prompt=system_prompt, | |
| user_prompt=user_prompt, | |
| auto_summarize=auto_summarize, | |
| api_name=api_name, | |
| api_key=api_key | |
| ) | |
| results.append(f"π {filename}: {result}") | |
| return "\n\n".join(results) | |
| except Exception as e: | |
| logging.exception(f"Error processing zip file: {str(e)}") | |
| return f"Error processing zip file: {str(e)}" | |
| def import_file_handler(files, author, keywords, system_prompt, user_prompt, auto_summarize, api_name, api_key): | |
| """Handle the import of one or more files, including zip files.""" | |
| try: | |
| if not files: | |
| log_counter("plaintext_import_error", labels={"error": "No files uploaded"}) | |
| return "No files uploaded." | |
| # Convert single file to list for consistent processing | |
| if not isinstance(files, list): | |
| files = [files] | |
| results = [] | |
| for file in files: | |
| log_counter("plaintext_import_attempt", labels={"file_name": file.name}) | |
| start_time = datetime.now() | |
| if not os.path.exists(file.name): | |
| log_counter("plaintext_import_error", labels={"error": "File not found", "file_name": file.name}) | |
| results.append(f"β File not found: {file.name}") | |
| continue | |
| if file.name.lower().endswith(('.md', '.txt', '.rtf', '.docx')): | |
| result = import_plain_text_file( | |
| file_path=file.name, | |
| author=author, | |
| keywords=keywords, | |
| system_prompt=system_prompt, | |
| user_prompt=user_prompt, | |
| auto_summarize=auto_summarize, | |
| api_name=api_name, | |
| api_key=api_key | |
| ) | |
| log_counter("plaintext_import_success", labels={"file_name": file.name}) | |
| results.append(f"π {file.name}: {result}") | |
| elif file.name.lower().endswith('.zip'): | |
| result = process_plain_text_zip_file( | |
| zip_file=file, | |
| author=author, | |
| keywords=keywords, | |
| system_prompt=system_prompt, | |
| user_prompt=user_prompt, | |
| auto_summarize=auto_summarize, | |
| api_name=api_name, | |
| api_key=api_key | |
| ) | |
| log_counter("zip_import_success", labels={"file_name": file.name}) | |
| results.append(f"π¦ {file.name}:\n{result}") | |
| else: | |
| log_counter("unsupported_file_type", labels={"file_type": file.name.split('.')[-1]}) | |
| results.append(f"β Unsupported file type: {file.name}") | |
| continue | |
| end_time = datetime.now() | |
| processing_time = (end_time - start_time).total_seconds() | |
| log_histogram("plaintext_import_duration", processing_time, labels={"file_name": file.name}) | |
| return "\n\n".join(results) | |
| except Exception as e: | |
| logging.exception("Error in import_file_handler") | |
| log_counter("plaintext_import_error", labels={"error": str(e)}) | |
| return f"β Error during import: {str(e)}" | |
| # | |
| # End of Plaintext_Files.py | |
| ####################################################################################################################### | |