from fastapi import FastAPI from pydantic import BaseModel from typing import List, Dict, Union from datasets import load_dataset import random import os from huggingface_hub import login app = FastAPI() # Logueo para acceder al dataset gated token = os.getenv("token_curso") if token: login(token) else: print("⚠️ WARNING: No se encontró el token de autenticación en la variable token_curso") # Carga y filtra nivel 1 GAIA (validation split) #ds = load_dataset("gaia-benchmark/GAIA", "2023_level1", split="validation",trust_remote_code=True) ds = load_dataset( "gaia-benchmark/GAIA", data_dir="2023_level1", split="validation", revision="refs/convert/parquet" ) QUESTIONS = [] GROUND_TRUTH: Dict[str, str] = {} for item in ds: task_id = str(item["task_id"]) QUESTIONS.append({ "task_id": task_id, "question": item["Question"] }) GROUND_TRUTH[task_id] = str(item["Final answer"]) class AnswerItem(BaseModel): task_id: str submitted_answer: Union[str, int, float] class Submission(BaseModel): username: str agent_code: str answers: List[AnswerItem] class ScoreResponse(BaseModel): username: str score: float correct_count: int total_attempted: int message: str @app.get("/questions") def get_questions(): # Devuelve las 20 preguntas aleatorias de nivel 1 cada vez chosen = random.sample(QUESTIONS, k=min(20, len(QUESTIONS))) return chosen @app.post("/submit") def submit(sub: Submission): correct = sum( 1 for ans in sub.answers if GROUND_TRUTH.get(ans.task_id, "") == str(ans.submitted_answer).strip() ) total = len(sub.answers) score = correct / total * 100 if total > 0 else 0.0 return ScoreResponse( username=sub.username, score=score, correct_count=correct, total_attempted=total, message=f"Puntuación: {correct}/{total} = {score:.1f}%" ) # GET /random-question: Fetch a single random question from the list. @app.get("/random-question") def get_random_question(): if not QUESTIONS: return {"error": "No questions available"} question = random.choice(QUESTIONS) return question # GET /files/{task_id}: Download a specific file associated with a given task ID. @app.get("/files/{task_id}") def get_file(task_id: str): file_path = f"files/{task_id}.txt" if not os.path.exists(file_path): return {"error": "File not found"} with open(file_path, "r") as file: content = file.read() return {"task_id": task_id, "content": content} from fastapi.responses import HTMLResponse @app.get("/", response_class=HTMLResponse) def read_root(): return """