Spaces:
Runtime error
Runtime error
| import json | |
| import os | |
| from datetime import datetime,timedelta,timezone | |
| from typing import Dict | |
| from dataclasses import dataclass | |
| from enum import Enum | |
| import torch | |
| from transformers import AutoModelForSequenceClassification, AutoTokenizer,AutoModel | |
| import traceback | |
| from src.evaluators import EVALUATOR_REGISTRY | |
| from src.evaluators.base_evaluator import BaseEvaluator | |
| from src.envs import API, EVAL_REQUESTS_PATH, RESULTS_REPO, QUEUE_REPO,TOKEN | |
| class EvaluationStatus(Enum): | |
| PENDING = "PENDING" | |
| RUNNING = "RUNNING" | |
| FINISHED = "FINISHED" | |
| FAILED = "FAILED" | |
| class EvaluationResult: | |
| """Dataclass to hold the results of a single model evaluation.""" | |
| model: str | |
| revision: str | |
| precision: str | |
| weight_type: str | |
| results: Dict[str, float] | |
| error: str = None | |
| def evaluate_model(model_name: str, revision: str, precision: str, weight_type: str) -> EvaluationResult: | |
| """ | |
| Evaluates a model on ALL registered tasks. | |
| """ | |
| try: | |
| print(f"\nStarting evaluation for model: {model_name}") | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| # Load model & tokenizer ONCE | |
| print("Loading classification model and tokenizer...") | |
| classification_model = AutoModelForSequenceClassification.from_pretrained( | |
| model_name, | |
| revision=revision, | |
| torch_dtype=getattr(torch, precision), | |
| trust_remote_code=True | |
| ).to(device) | |
| tokenizer = AutoTokenizer.from_pretrained(model_name, revision=revision) | |
| print("✅ Classification Model loaded successfully.") | |
| print("Loading base model...") | |
| embdding_model = AutoModel.from_pretrained( | |
| model_name, | |
| revision=revision, | |
| torch_dtype=getattr(torch, precision), | |
| trust_remote_code=True | |
| ).to(device) | |
| print("✅ Embedding Model loaded successfully.") | |
| all_results = {} | |
| for task_name, EvaluatorClass in EVALUATOR_REGISTRY.items(): | |
| print(f"\n--- Evaluating: {task_name} ---") | |
| try: | |
| if task_name == "Sentiment Analysis": | |
| model = classification_model | |
| print("Using classification model for Sentiment Analysis") | |
| elif task_name in ["Transliteration", "Normalization"]: | |
| model = embdding_model | |
| print(f"Using base embedding model for {task_name}") | |
| else: | |
| raise ValueError(f"Unknown task for model selection: {task_name}") | |
| evaluator: BaseEvaluator = EvaluatorClass() | |
| result = evaluator.evaluate(model, tokenizer, device=device) | |
| # Extract main metric (must be in every evaluator) | |
| all_results[task_name] = result["main_metric"] | |
| print(f"✅ {task_name}: {result['main_metric']:.4f}") | |
| except Exception as e: | |
| error_msg = f"Failed {task_name}: {str(e)}" | |
| print(f"❌ {error_msg}") | |
| all_results[task_name] = None # or skip | |
| return EvaluationResult( | |
| model=model_name, | |
| revision=revision, | |
| precision=precision, | |
| weight_type=weight_type, | |
| results=all_results | |
| ) | |
| except Exception as e: | |
| error_msg = f"Critical failure: {str(e)}" | |
| print(f"💥 {error_msg}") | |
| return EvaluationResult( | |
| model=model_name, | |
| revision=revision, | |
| precision=precision, | |
| weight_type=weight_type, | |
| results={}, | |
| error=error_msg | |
| ) | |
| def reset_stale_running_eval(eval_entry,root ,file_path ,filename ,timeout_interval=10): | |
| submission = eval_entry.get("submitted_time") | |
| try: | |
| started = datetime.fromisoformat(submission) # aware datetime | |
| except Exception as e: | |
| print("Invalid submitted_time format:", submission, e) | |
| now_utc = datetime.now(timezone.utc) | |
| if now_utc - started > timedelta(seconds=timeout_interval): | |
| print(f"Timeout detected — resetting {eval_entry['model']} to PENDING") | |
| eval_entry["status"] = EvaluationStatus.PENDING.value | |
| eval_entry["submitted_time"] = now_utc.isoformat() | |
| with open(file_path, 'w') as f: | |
| json.dump(eval_entry, f, indent=2) | |
| API.upload_file( | |
| path_or_fileobj=file_path, | |
| path_in_repo=os.path.join(os.path.basename(root), filename), | |
| repo_id=QUEUE_REPO, | |
| repo_type="dataset", | |
| commit_message=f"Update status to PENDING for {eval_entry['model']} (timeout)", | |
| token=TOKEN | |
| ) | |
| return | |
| def process_evaluation_queue(): | |
| """ | |
| Processes all pending evaluations in the queue. | |
| This function acts as a worker that finds a PENDING job, runs it, | |
| and updates the status on the Hugging Face Hub. | |
| """ | |
| print("\n=== Starting evaluation queue processing ===") | |
| print(f"Current time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
| print(f"Looking for evaluation requests in: {EVAL_REQUESTS_PATH}") | |
| if not os.path.exists(EVAL_REQUESTS_PATH): | |
| print(f"Evaluation requests path does not exist: {EVAL_REQUESTS_PATH}") | |
| return | |
| for root, _, files in os.walk(EVAL_REQUESTS_PATH): | |
| for filename in files: | |
| if filename.endswith('.json'): | |
| file_path = os.path.join(root, filename) | |
| print(f"\nProcessing file: {file_path}") | |
| try: | |
| with open(file_path, 'r') as f: | |
| eval_entry = json.load(f) | |
| status = eval_entry.get('status', '') | |
| if status == EvaluationStatus.PENDING.value: | |
| print(f"Found pending evaluation for model: {eval_entry['model']}") | |
| # --- Step 1: Update status to RUNNING locally and on Hub --- | |
| eval_entry['status'] = EvaluationStatus.RUNNING.value | |
| with open(file_path, 'w') as f: | |
| json.dump(eval_entry, f, indent=2) | |
| user_name = os.path.basename(root) | |
| path_in_repo_queue = os.path.join(user_name, filename) | |
| # Upload the updated file to the queue repo to reflect 'RUNNING' status | |
| API.upload_file( | |
| path_or_fileobj=file_path, | |
| path_in_repo=path_in_repo_queue, | |
| repo_id=QUEUE_REPO, | |
| repo_type="dataset", | |
| commit_message=f"Update status to RUNNING for {eval_entry['model']}" | |
| ) | |
| print(f"Updated status to RUNNING in queue: {path_in_repo_queue}") | |
| # --- Step 2: Run the evaluation --- | |
| print("\n=== Starting evaluation ===") | |
| eval_result = evaluate_model( | |
| model_name=eval_entry['model'], | |
| revision=eval_entry['revision'], | |
| precision=eval_entry['precision'], | |
| weight_type=eval_entry['weight_type'] | |
| ) | |
| for v in eval_result.results.values(): | |
| if v is None: | |
| if eval_result.error is None: | |
| eval_result.error = "" | |
| eval_result.error += f"Evaluation failed for {eval_entry['model']}: {v} is None" | |
| print("\n=== Evaluation completed ===") | |
| # --- Step 3: Update file with final status and results locally --- | |
| if eval_result.error: | |
| eval_entry['status'] = EvaluationStatus.FAILED.value | |
| eval_entry['error'] = eval_result.error | |
| print(f"Evaluation failed with error: {eval_result.error}") | |
| else: | |
| eval_entry['status'] = EvaluationStatus.FINISHED.value | |
| eval_entry['results'] = eval_result.results | |
| print(f"Evaluation finished successfully. Results: {eval_result.results}") | |
| with open(file_path, 'w') as f: | |
| json.dump(eval_entry, f, indent=2) | |
| # --- Step 4: Upload the final file to the results directory on the Hub --- | |
| try: | |
| # Use the local file with its final status as the basis for the results file | |
| path_in_repo_results = os.path.join(user_name, filename) | |
| API.upload_file( | |
| path_or_fileobj=file_path, | |
| path_in_repo=path_in_repo_results, | |
| repo_id=RESULTS_REPO, | |
| repo_type="dataset", | |
| commit_message=f"Evaluation {'results' if not eval_result.error else 'error'} for {eval_entry['model']}" | |
| ) | |
| print("\nResults uploaded to Hugging Face successfully.") | |
| except Exception as upload_error: | |
| print(f"Error uploading results: {str(upload_error)}") | |
| # --- Step 5: Update the status of the request in the queue to FINISHED/FAILED --- | |
| # This keeps a record of all processed jobs in the queue repo. | |
| try: | |
| API.upload_file( | |
| path_or_fileobj=file_path, | |
| path_in_repo=path_in_repo_queue, | |
| repo_id=QUEUE_REPO, | |
| repo_type="dataset", | |
| commit_message=f"Final status update for {eval_entry['model']}" | |
| ) | |
| print(f"Final status for {eval_entry['model']} updated in the queue repository.") | |
| except Exception as status_update_error: | |
| print(f"Error updating status in queue: {str(status_update_error)}") | |
| elif status == EvaluationStatus.RUNNING.value: | |
| print("Found Running evaluation for model: ", eval_entry['model']) | |
| reset_stale_running_eval(eval_entry, root, file_path, filename) | |
| else: | |
| print(f"Skipping file with status: {status}") | |
| except Exception as e: | |
| print(f"Error processing file {file_path}: {str(e)}") | |
| print(f"Full traceback: {traceback.format_exc()}") | |
| continue | |
| print("\n=== Evaluation queue processed. ===") | |
| print("No more pending jobs found.") | |
| return | |