Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import requests | |
| DEFAULT_FILES_DIR = "files" # Subdirectory for task-related files | |
| FILE_API_BASE_URL = "https://agents-course-unit4-scoring.hf.space/files/" | |
| def _extract_filename_from_cd(cd_header: str | None) -> str | None: | |
| """Extracts filename from Content-Disposition header.""" | |
| if not cd_header: | |
| return None | |
| # Check for filename*=UTF-8''<encoded_filename> | |
| fname_star_match = re.search( | |
| r"filename\*=UTF-8''([^';\s]+)", cd_header, re.IGNORECASE) | |
| if fname_star_match: | |
| return requests.utils.unquote(fname_star_match.group(1)) | |
| # Check for filename="<filename>" | |
| fname_match = re.search(r'filename="([^"]+)"', cd_header, re.IGNORECASE) | |
| if fname_match: | |
| return fname_match.group(1) | |
| # Check for plain filename=<filename> | |
| fname_plain_match = re.search( | |
| r'filename=([^;"]+)', cd_header, re.IGNORECASE) | |
| if fname_plain_match: | |
| return fname_plain_match.group(1).strip('"') | |
| return None | |
| def _get_extension_from_content_type(content_type: str | None) -> str | None: | |
| """Suggests a file extension based on MIME type.""" | |
| if not content_type: | |
| return None | |
| # Simple mapping, can be expanded | |
| mime_to_ext = { | |
| 'text/plain': '.txt', | |
| 'application/json': '.json', | |
| 'text/csv': '.csv', | |
| 'application/pdf': '.pdf', | |
| 'image/jpeg': '.jpg', | |
| 'image/png': '.png', | |
| 'text/x-python': '.py', | |
| # Often used as a generic, extension might be in filename | |
| 'application/octet-stream': '' | |
| } | |
| # Get the main type/subtype part | |
| main_type = content_type.split(';')[0].strip().lower() | |
| return mime_to_ext.get(main_type) | |
| def get_task_file_path(task_id: str, local_files_dir: str = DEFAULT_FILES_DIR) -> str | None: | |
| """ | |
| Checks for a local file starting with task_id in the specified directory. | |
| If not found, attempts to download it from the standard API. | |
| Returns the full absolute path to the file if found or successfully downloaded, otherwise None. | |
| Prints progress and errors to stdout. | |
| """ | |
| os.makedirs(local_files_dir, exist_ok=True) | |
| # 1. Check for existing local file whose name starts with the task_id | |
| try: | |
| for filename in os.listdir(local_files_dir): | |
| if filename.startswith(task_id): | |
| full_path = os.path.abspath( | |
| os.path.join(local_files_dir, filename)) | |
| print( | |
| f"FileHandler: Found existing local file for task {task_id}: {full_path}") | |
| return full_path | |
| except OSError as e: | |
| print( | |
| f"FileHandler: Notice - Error listing files in {local_files_dir} (will attempt download): {e}") | |
| # 2. If not found locally, attempt to download | |
| file_api_url = f"{FILE_API_BASE_URL}{task_id}" | |
| print( | |
| f"FileHandler: Local file for task {task_id} not found. Attempting download from: {file_api_url}") | |
| try: | |
| with requests.Session() as session: | |
| # Increased timeout slightly | |
| response = session.get( | |
| file_api_url, timeout=15, allow_redirects=True) | |
| if response.status_code == 200: | |
| if not response.content: # Check if the content is empty | |
| print( | |
| f"FileHandler: File indicated for task {task_id} but server sent no content (empty file). Not saving.") | |
| return None | |
| cd_header = response.headers.get('Content-Disposition') | |
| original_filename = _extract_filename_from_cd(cd_header) | |
| # Determine a sane filename | |
| if original_filename: | |
| sane_filename_base = os.path.basename(original_filename) | |
| else: # Fallback if no Content-Disposition filename | |
| content_type = response.headers.get('Content-Type') | |
| extension = _get_extension_from_content_type( | |
| content_type) or '' | |
| # Default name if no CD | |
| sane_filename_base = f"{task_id}_downloaded{extension}" | |
| print( | |
| f"FileHandler: No filename in Content-Disposition for {task_id}. Using fallback: {sane_filename_base}") | |
| # Ensure the filename starts with task_id for consistent local finding later | |
| if not sane_filename_base.startswith(task_id): | |
| sane_filename = f"{task_id}_{sane_filename_base}" | |
| else: | |
| sane_filename = sane_filename_base | |
| file_path = os.path.join(local_files_dir, sane_filename) | |
| with open(file_path, 'wb') as f: | |
| f.write(response.content) | |
| abs_path = os.path.abspath(file_path) | |
| print( | |
| f"FileHandler: File '{sane_filename}' for task {task_id} downloaded to '{abs_path}'. Size: {len(response.content)} bytes.") | |
| return abs_path | |
| elif response.status_code == 404: | |
| print( | |
| f"FileHandler: No file found for task_id {task_id} at API (HTTP 404 Not Found).") | |
| return None | |
| else: | |
| print( | |
| f"FileHandler: Failed to download file for task {task_id}. Server responded with HTTP status {response.status_code}.") | |
| return None | |
| except requests.exceptions.Timeout: | |
| print( | |
| f"FileHandler: Request timed out while trying to download file for task ID '{task_id}'.") | |
| return None | |
| except requests.exceptions.RequestException as e: | |
| print( | |
| f"FileHandler: An error occurred during file download for task ID '{task_id}': {type(e).__name__} - {e}.") | |
| return None | |
| except IOError as e: # Catch errors during file writing | |
| print( | |
| f"FileHandler: An IO error occurred while saving the file for task ID '{task_id}': {e}") | |
| return None | |