|
|
import os |
|
|
import requests |
|
|
import json |
|
|
import gradio as gr |
|
|
import pandas as pd |
|
|
from smolagents import CodeAgent, InferenceClientModel, DuckDuckGoSearchTool |
|
|
from typing import List, Dict, Any |
|
|
from dotenv import load_dotenv |
|
|
import mimetypes |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
|
|
|
API_BASE_URL = os.getenv("API_BASE_URL", "https://agents-course-unit4-scoring.hf.space") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
DEFAULT_HF_SPACE_URL = "https://huggingface.co/spaces/tuannhtn80/my-agents-course-final-project/tree/main" |
|
|
|
|
|
YOUR_HF_SPACE_URL = "https://huggingface.co/spaces/tuannhtn80/my-agents-course-final-project/tree/main" |
|
|
|
|
|
|
|
|
HF_TOKEN = os.environ.get("HF_TOKEN") |
|
|
if not HF_TOKEN: |
|
|
print("WARNING: HF_TOKEN not found in environment variables.") |
|
|
print("Please add your Hugging Face token as a Secret in this Space.") |
|
|
print("Go to Settings > Secrets > New secret") |
|
|
print("Name: HF_TOKEN, Value: hf_...") |
|
|
|
|
|
|
|
|
QUESTIONS_URL = f"{API_BASE_URL}/questions" |
|
|
SUBMIT_URL = f"{API_BASE_URL}/submit" |
|
|
|
|
|
|
|
|
QUESTIONS_FILE = "questions.json" |
|
|
ANSWERS_FILE = "answers.json" |
|
|
|
|
|
from smolagents import ( |
|
|
CodeAgent, |
|
|
DuckDuckGoSearchTool, |
|
|
PythonInterpreterTool, |
|
|
tool, |
|
|
InferenceClientModel |
|
|
) |
|
|
from typing import List, Dict, Any, Optional |
|
|
import os |
|
|
import tempfile |
|
|
import re |
|
|
import json |
|
|
import requests |
|
|
from urllib.parse import urlparse |
|
|
|
|
|
@tool |
|
|
def save_and_read_file(content: str, filename: Optional[str] = None) -> str: |
|
|
""" |
|
|
Save content to a temporary file and return the path. |
|
|
Useful for processing files from the GAIA API. |
|
|
|
|
|
Args: |
|
|
content: The content to save to the file |
|
|
filename: Optional filename, will generate a random name if not provided |
|
|
|
|
|
Returns: |
|
|
Path to the saved file |
|
|
""" |
|
|
temp_dir = tempfile.gettempdir() |
|
|
if filename is None: |
|
|
temp_file = tempfile.NamedTemporaryFile(delete=False) |
|
|
filepath = temp_file.name |
|
|
else: |
|
|
filepath = os.path.join(temp_dir, filename) |
|
|
|
|
|
|
|
|
with open(filepath, 'w') as f: |
|
|
f.write(content) |
|
|
|
|
|
return f"File saved to {filepath}. You can read this file to process its contents." |
|
|
|
|
|
@tool |
|
|
def download_file_from_url(url: str, filename: Optional[str] = None) -> str: |
|
|
""" |
|
|
Download a file from a URL and save it to a temporary location. |
|
|
|
|
|
Args: |
|
|
url: The URL to download from |
|
|
filename: Optional filename, will generate one based on URL if not provided |
|
|
|
|
|
Returns: |
|
|
Path to the downloaded file |
|
|
""" |
|
|
try: |
|
|
|
|
|
if not filename: |
|
|
path = urlparse(url).path |
|
|
filename = os.path.basename(path) |
|
|
if not filename: |
|
|
|
|
|
import uuid |
|
|
filename = f"downloaded_{uuid.uuid4().hex[:8]}" |
|
|
|
|
|
|
|
|
temp_dir = tempfile.gettempdir() |
|
|
filepath = os.path.join(temp_dir, filename) |
|
|
|
|
|
|
|
|
response = requests.get(url, stream=True) |
|
|
response.raise_for_status() |
|
|
|
|
|
|
|
|
with open(filepath, 'wb') as f: |
|
|
for chunk in response.iter_content(chunk_size=8192): |
|
|
f.write(chunk) |
|
|
|
|
|
return f"File downloaded to {filepath}. You can now process this file." |
|
|
except Exception as e: |
|
|
return f"Error downloading file: {str(e)}" |
|
|
|
|
|
@tool |
|
|
def extract_text_from_image(image_path: str) -> str: |
|
|
""" |
|
|
Extract text from an image using pytesseract (if available). |
|
|
|
|
|
Args: |
|
|
image_path: Path to the image file |
|
|
|
|
|
Returns: |
|
|
Extracted text or error message |
|
|
""" |
|
|
try: |
|
|
|
|
|
import pytesseract |
|
|
from PIL import Image |
|
|
|
|
|
|
|
|
image = Image.open(image_path) |
|
|
|
|
|
|
|
|
text = pytesseract.image_to_string(image) |
|
|
|
|
|
return f"Extracted text from image:\n\n{text}" |
|
|
except ImportError: |
|
|
return "Error: pytesseract is not installed. Please install it with 'pip install pytesseract' and ensure Tesseract OCR is installed on your system." |
|
|
except Exception as e: |
|
|
return f"Error extracting text from image: {str(e)}" |
|
|
|
|
|
@tool |
|
|
def analyze_csv_file(file_path: str, query: str) -> str: |
|
|
""" |
|
|
Analyze a CSV file using pandas and answer a question about it. |
|
|
|
|
|
Args: |
|
|
file_path: Path to the CSV file |
|
|
query: Question about the data |
|
|
|
|
|
Returns: |
|
|
Analysis result or error message |
|
|
""" |
|
|
try: |
|
|
import pandas as pd |
|
|
|
|
|
|
|
|
df = pd.read_csv(file_path) |
|
|
|
|
|
|
|
|
result = f"CSV file loaded with {len(df)} rows and {len(df.columns)} columns.\n" |
|
|
result += f"Columns: {', '.join(df.columns)}\n\n" |
|
|
|
|
|
|
|
|
result += "Summary statistics:\n" |
|
|
result += str(df.describe()) |
|
|
|
|
|
return result |
|
|
except ImportError: |
|
|
return "Error: pandas is not installed. Please install it with 'pip install pandas'." |
|
|
except Exception as e: |
|
|
return f"Error analyzing CSV file: {str(e)}" |
|
|
|
|
|
@tool |
|
|
def analyze_excel_file(file_path: str, query: str) -> str: |
|
|
""" |
|
|
Analyze an Excel file using pandas and answer a question about it. |
|
|
|
|
|
Args: |
|
|
file_path: Path to the Excel file |
|
|
query: Question about the data |
|
|
|
|
|
Returns: |
|
|
Analysis result or error message |
|
|
""" |
|
|
try: |
|
|
import pandas as pd |
|
|
|
|
|
|
|
|
df = pd.read_excel(file_path) |
|
|
|
|
|
|
|
|
result = f"Excel file loaded with {len(df)} rows and {len(df.columns)} columns.\n" |
|
|
result += f"Columns: {', '.join(df.columns)}\n\n" |
|
|
|
|
|
|
|
|
result += "Summary statistics:\n" |
|
|
result += str(df.describe()) |
|
|
|
|
|
return result |
|
|
except ImportError: |
|
|
return "Error: pandas and openpyxl are not installed. Please install them with 'pip install pandas openpyxl'." |
|
|
except Exception as e: |
|
|
return f"Error analyzing Excel file: {str(e)}" |
|
|
|
|
|
class GAIAAgent: |
|
|
def __init__( |
|
|
self, |
|
|
model_type: str = "InferenceClientModel", |
|
|
model_id: Optional[str] = None, |
|
|
api_key: Optional[str] = None, |
|
|
api_base: Optional[str] = None, |
|
|
temperature: float = 0.2, |
|
|
executor_type: str = "local", |
|
|
additional_imports: List[str] = None, |
|
|
additional_tools: List[Any] = None, |
|
|
system_prompt: Optional[str] = None, |
|
|
verbose: bool = False, |
|
|
provider: Optional[str] = None, |
|
|
timeout: Optional[int] = None |
|
|
): |
|
|
""" |
|
|
Initialize a GAIAAgent with specified configuration |
|
|
|
|
|
Args: |
|
|
model_type: Type of model to use (InferenceClientModel) |
|
|
model_id: ID of the model to use |
|
|
api_key: API key for the model provider |
|
|
api_base: Base URL for API calls |
|
|
temperature: Temperature for text generation |
|
|
executor_type: Type of executor for code execution ('local' or 'e2b') |
|
|
additional_imports: Additional Python modules to allow importing |
|
|
additional_tools: Additional tools to provide to the agent |
|
|
system_prompt: Custom system prompt to use (not directly used, kept for backward compatibility) |
|
|
verbose: Enable verbose logging |
|
|
provider: Provider for InferenceClientModel (e.g., "hf-inference") |
|
|
timeout: Timeout in seconds for API calls |
|
|
""" |
|
|
|
|
|
self.verbose = verbose |
|
|
self.system_prompt = system_prompt |
|
|
|
|
|
|
|
|
if model_type == "InferenceClientModel": |
|
|
if api_key is None: |
|
|
api_key = os.getenv("HF_TOKEN") |
|
|
if not api_key: |
|
|
raise ValueError("No Hugging Face token provided. Please set HUGGINGFACEHUB_API_TOKEN environment variable or pass api_key parameter.") |
|
|
|
|
|
if self.verbose: |
|
|
print(f"Using Hugging Face token: {api_key[:5]}...") |
|
|
|
|
|
self.model = InferenceClientModel( |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
model_id = "deepseek-ai/DeepSeek-V3.1", |
|
|
|
|
|
token=api_key, |
|
|
timeout=timeout or 120, |
|
|
temperature=temperature |
|
|
) |
|
|
else: |
|
|
raise ValueError(f"Unknown model type: {model_type}") |
|
|
|
|
|
if self.verbose: |
|
|
print(f"Initialized model: {model_type} - {model_id}") |
|
|
|
|
|
|
|
|
self.tools = [ |
|
|
DuckDuckGoSearchTool(), |
|
|
PythonInterpreterTool(), |
|
|
save_and_read_file, |
|
|
download_file_from_url, |
|
|
analyze_csv_file, |
|
|
analyze_excel_file |
|
|
] |
|
|
|
|
|
|
|
|
try: |
|
|
import pytesseract |
|
|
from PIL import Image |
|
|
self.tools.append(extract_text_from_image) |
|
|
if self.verbose: |
|
|
print("Added image processing tool") |
|
|
except ImportError: |
|
|
if self.verbose: |
|
|
print("Image processing libraries not available") |
|
|
|
|
|
|
|
|
if additional_tools: |
|
|
self.tools.extend(additional_tools) |
|
|
|
|
|
if self.verbose: |
|
|
print(f"Initialized with {len(self.tools)} tools") |
|
|
|
|
|
|
|
|
self.imports = ["pandas", "numpy", "datetime", "json", "re", "math", "os", "requests", "csv", "urllib"] |
|
|
if additional_imports: |
|
|
self.imports.extend(additional_imports) |
|
|
|
|
|
|
|
|
executor_kwargs = {} |
|
|
if executor_type == "e2b": |
|
|
try: |
|
|
|
|
|
from e2b_code_interpreter import Sandbox |
|
|
if self.verbose: |
|
|
print("Using e2b executor") |
|
|
except ImportError: |
|
|
if self.verbose: |
|
|
print("e2b dependencies not found, falling back to local executor") |
|
|
executor_type = "local" |
|
|
|
|
|
self.agent = CodeAgent( |
|
|
tools=self.tools, |
|
|
model=self.model, |
|
|
additional_authorized_imports=self.imports, |
|
|
executor_type=executor_type, |
|
|
executor_kwargs=executor_kwargs, |
|
|
verbosity_level=2 if self.verbose else 0 |
|
|
) |
|
|
|
|
|
if self.verbose: |
|
|
print("Agent initialized and ready") |
|
|
|
|
|
def answer_question(self, question: str, task_file_path: Optional[str] = None) -> str: |
|
|
""" |
|
|
Process a GAIA benchmark question and return the answer |
|
|
|
|
|
Args: |
|
|
question: The question to answer |
|
|
task_file_path: Optional path to a file associated with the question |
|
|
|
|
|
Returns: |
|
|
The answer to the question |
|
|
""" |
|
|
try: |
|
|
if self.verbose: |
|
|
print(f"Processing question: {question}") |
|
|
if task_file_path: |
|
|
print(f"With associated file: {task_file_path}") |
|
|
|
|
|
|
|
|
context = question |
|
|
file_content = None |
|
|
|
|
|
|
|
|
if task_file_path: |
|
|
try: |
|
|
with open(task_file_path, 'r') as f: |
|
|
file_content = f.read() |
|
|
|
|
|
|
|
|
import os |
|
|
file_ext = os.path.splitext(task_file_path)[1].lower() |
|
|
|
|
|
context = f""" |
|
|
Question: {question} |
|
|
This question has an associated file. Here is the file content: |
|
|
```{file_ext} |
|
|
{file_content} |
|
|
``` |
|
|
Analyze the file content above to answer the question. |
|
|
""" |
|
|
except Exception as file_e: |
|
|
context = f""" |
|
|
Question: {question} |
|
|
This question has an associated file at path: {task_file_path} |
|
|
However, there was an error reading the file: {file_e} |
|
|
You can still try to answer the question based on the information provided. |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
if question.startswith(".") or ".rewsna eht sa" in question: |
|
|
context = f""" |
|
|
This question appears to be in reversed text. Here's the reversed version: |
|
|
{question[::-1]} |
|
|
Now answer the question above. Remember to format your answer exactly as requested. |
|
|
""" |
|
|
|
|
|
|
|
|
full_prompt = f"""{context} |
|
|
When answering, provide ONLY the precise answer requested. |
|
|
Do not include explanations, steps, reasoning, or additional text. |
|
|
Be direct and specific. GAIA benchmark requires exact matching answers. |
|
|
For example, if asked "What is the capital of France?", respond simply with "Paris". |
|
|
""" |
|
|
|
|
|
|
|
|
answer = self.agent.run(full_prompt) |
|
|
|
|
|
|
|
|
|
|
|
answer = self._clean_answer(answer) |
|
|
|
|
|
if self.verbose: |
|
|
print(f"Generated answer: {answer}") |
|
|
|
|
|
return answer |
|
|
except Exception as e: |
|
|
error_msg = f"Error answering question: {e}" |
|
|
if self.verbose: |
|
|
print(error_msg) |
|
|
return error_msg |
|
|
|
|
|
def _clean_answer(self, answer: any) -> str: |
|
|
""" |
|
|
Clean up the answer to remove common prefixes and formatting |
|
|
that models often add but that can cause exact match failures. |
|
|
|
|
|
Args: |
|
|
answer: The raw answer from the model |
|
|
|
|
|
Returns: |
|
|
The cleaned answer as a string |
|
|
""" |
|
|
|
|
|
if not isinstance(answer, str): |
|
|
|
|
|
if isinstance(answer, float): |
|
|
|
|
|
|
|
|
if answer.is_integer(): |
|
|
formatted_answer = str(int(answer)) |
|
|
else: |
|
|
|
|
|
if abs(answer) >= 1000: |
|
|
formatted_answer = f"${answer:,.2f}" |
|
|
else: |
|
|
formatted_answer = str(answer) |
|
|
return formatted_answer |
|
|
elif isinstance(answer, int): |
|
|
return str(answer) |
|
|
else: |
|
|
|
|
|
return str(answer) |
|
|
|
|
|
|
|
|
|
|
|
answer = answer.strip() |
|
|
|
|
|
|
|
|
prefixes_to_remove = [ |
|
|
"The answer is ", |
|
|
"Answer: ", |
|
|
"Final answer: ", |
|
|
"The result is ", |
|
|
"To answer this question: ", |
|
|
"Based on the information provided, ", |
|
|
"According to the information: ", |
|
|
] |
|
|
|
|
|
for prefix in prefixes_to_remove: |
|
|
if answer.startswith(prefix): |
|
|
answer = answer[len(prefix):].strip() |
|
|
|
|
|
|
|
|
if (answer.startswith('"') and answer.endswith('"')) or (answer.startswith("'") and answer.endswith("'")): |
|
|
answer = answer[1:-1].strip() |
|
|
|
|
|
return answer |
|
|
|
|
|
|
|
|
|
|
|
def setup_agent() -> GAIAAgent: |
|
|
""" |
|
|
Initializes and configures the smol-agent. |
|
|
""" |
|
|
print("Setting up agent...") |
|
|
|
|
|
if not HF_TOKEN: |
|
|
raise ValueError("HF_TOKEN is not set. Cannot initialize agent model.") |
|
|
|
|
|
|
|
|
agent = GAIAAgent( |
|
|
model_id = "Qwen/Qwen3-Next-80B-A3B-Instruct", |
|
|
api_key=HF_TOKEN, |
|
|
verbose=True |
|
|
) |
|
|
|
|
|
print("Agent setup complete.") |
|
|
return agent |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_system_prompt(task_id: str, question: str) -> str: |
|
|
""" |
|
|
Creates a specific system prompt for the agent, |
|
|
optimized for direct answers and OCR tasks. |
|
|
""" |
|
|
return f""" |
|
|
You are an expert AI agent. Your sole purpose is to solve a task and provide a single, exact, and final answer. |
|
|
|
|
|
**CRITICAL RULES:** |
|
|
1. You MUST write and execute Python code to solve the problem. |
|
|
2. You MUST follow the "HOW TO WORK" steps. |
|
|
3. Your final response MUST be ONLY the answer. No "Thought", "Action", "Final Answer:", or any other text. |
|
|
|
|
|
**TASK DETAILS:** |
|
|
- **Question:** "{question}" |
|
|
- **Task ID:** {task_id} |
|
|
|
|
|
**HOW TO WORK:** |
|
|
1. **Think (Internal)**: Analyze the question. (Do not output this thought). |
|
|
2. **Search (If Needed)**: Use `duckduckgo_search` if you need external information. |
|
|
3. **Download File**: You MUST download the relevant file using this exact Python code block. The file's name will be stored in the `file_name` variable. |
|
|
```python |
|
|
import requests |
|
|
import mimetypes |
|
|
|
|
|
file_url = f"{API_BASE_URL}/files/{task_id}" |
|
|
print(f"Downloading file from {{file_url}}...") # Use {{}} to escape f-string |
|
|
response = requests.get(file_url) |
|
|
|
|
|
# Determine file extension |
|
|
content_type = response.headers.get('content-type') |
|
|
extension = mimetypes.guess_extension(content_type) |
|
|
if not extension: |
|
|
# Fallback logic from original prompt |
|
|
if 'csv' in content_type: |
|
|
extension = '.csv' |
|
|
elif 'excel' in content_type or 'spreadsheetml' in content_type: |
|
|
extension = '.xlsx' |
|
|
elif 'jpeg' in content_type: |
|
|
extension = '.jpg' |
|
|
elif 'png' in content_type: |
|
|
extension = '.png' |
|
|
else: |
|
|
extension = '.bin' # default |
|
|
|
|
|
file_name = "downloaded_file" + extension |
|
|
with open(file_name, 'wb') as f: |
|
|
f.write(response.content) |
|
|
print(f"File saved as {{file_name}}") # Use {{}} to escape f-string |
|
|
print(f"File content-type: {{content_type}}") |
|
|
``` |
|
|
4. **Analyze File with Code**: After downloading, use the `file_name` variable to analyze the file. |
|
|
- **For CSV/Excel files (e.g., .csv, .xlsx)**: |
|
|
```python |
|
|
!pip install pandas openpyxl |
|
|
import pandas as pd |
|
|
|
|
|
# The 'file_name' variable comes from the download step |
|
|
try: |
|
|
if file_name.endswith('.csv'): |
|
|
df = pd.read_csv(file_name) |
|
|
else: |
|
|
df = pd.read_excel(file_name) |
|
|
print(df.head()) |
|
|
# ... write more code to get the answer from the 'df' dataframe |
|
|
except Exception as e: |
|
|
print(f"Error reading file with pandas: {{e}}") |
|
|
``` |
|
|
- **For Image files (e.g., .jpg, .png)**: |
|
|
```python |
|
|
# Step 1: Install Tesseract-OCR system package |
|
|
!apt-get update && apt-get install -y tesseract-ocr |
|
|
|
|
|
# Step 2: Install Python libraries |
|
|
!pip install Pillow pytesseract |
|
|
|
|
|
import pytesseract |
|
|
from PIL import Image |
|
|
|
|
|
# The 'file_name' variable comes from the download step |
|
|
try: |
|
|
img = Image.open(file_name) |
|
|
text_from_image = pytesseract.image_to_string(img) |
|
|
print("--- OCR Result ---") |
|
|
print(text_from_image) |
|
|
print("------------------") |
|
|
# ... write more code to analyze 'text_from_image' to find the answer |
|
|
except Exception as e: |
|
|
print(f"Error processing image with OCR: {{e}}") |
|
|
``` |
|
|
5. **Produce Final Answer**: After all steps, your code must find the answer. Your *final* output message MUST be *only* this answer. |
|
|
- **Example of your internal code (DO NOT OUTPUT):** |
|
|
```python |
|
|
# ... code to find the answer ... |
|
|
final_answer = 42 |
|
|
print(final_answer) # This is how you print the final answer |
|
|
``` |
|
|
- **Your Final Response (Correct):** |
|
|
`42` |
|
|
- **Your Final Response (INCORRECT):** |
|
|
`Thought: The final answer is 42. Final Answer: 42` |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_questions() -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Fetches the list of questions from the challenge API. |
|
|
Also saves the questions to a local `questions.json` file. |
|
|
""" |
|
|
print(f"Fetching questions from {QUESTIONS_URL}...") |
|
|
try: |
|
|
response = requests.get(QUESTIONS_URL) |
|
|
response.raise_for_status() |
|
|
questions = response.json() |
|
|
print(f"Successfully fetched {len(questions)} questions.") |
|
|
|
|
|
|
|
|
try: |
|
|
with open(QUESTIONS_FILE, 'w') as f: |
|
|
json.dump(questions, f, indent=4) |
|
|
print(f"Questions saved to {QUESTIONS_FILE}") |
|
|
except Exception as e: |
|
|
print(f"Warning: Could not save questions to {QUESTIONS_FILE}. Error: {e}") |
|
|
|
|
|
return questions |
|
|
except Exception as e: |
|
|
print(f"Error fetching questions: {e}") |
|
|
raise |
|
|
|
|
|
def process_question(agent: GAIAAgent, task: Dict[str, Any], all_answers: List[Dict[str, str]]) -> Dict[str, str]: |
|
|
""" |
|
|
Uses the agent to process a single question and get an answer. |
|
|
Saves incremental progress to answers.json. |
|
|
(This is from app.py) |
|
|
""" |
|
|
task_id = task.get("task_id") |
|
|
question = task.get("question") |
|
|
|
|
|
print(f"\n--- Processing Task {task_id} ---") |
|
|
print(f"Question: {question}") |
|
|
|
|
|
try: |
|
|
|
|
|
answer = agent.answer_question(question=question) |
|
|
|
|
|
|
|
|
answer = str(answer).strip() |
|
|
print(f"Agent's Answer: {answer}") |
|
|
|
|
|
answer_dict = { |
|
|
"task_id": task_id, |
|
|
"submitted_answer": answer |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error processing task {task_id}: {e}") |
|
|
answer_dict = { |
|
|
"task_id": task_id, |
|
|
"submitted_answer": "ERROR_PROCESSING" |
|
|
} |
|
|
|
|
|
|
|
|
all_answers.append(answer_dict) |
|
|
try: |
|
|
with open(ANSWERS_FILE, 'w') as f: |
|
|
json.dump(all_answers, f, indent=4) |
|
|
except Exception as e: |
|
|
print(f"Warning: Could not save incremental answers to {ANSWERS_FILE}. Error: {e}") |
|
|
|
|
|
return answer_dict |
|
|
|
|
|
def submit_results(username: str, agent_code_url: str, answers: List[Dict[str, str]]) -> str: |
|
|
""" |
|
|
Submits the collected answers to the API and returns a status string. |
|
|
(This combines logic from both files) |
|
|
""" |
|
|
print(f"\n--- Submitting {len(answers)} Answers ---") |
|
|
|
|
|
|
|
|
submission_payload = { |
|
|
"username": username, |
|
|
"agent_code": agent_code_url, |
|
|
"answers": answers |
|
|
} |
|
|
|
|
|
try: |
|
|
response = requests.post(SUBMIT_URL, json=submission_payload, timeout=60) |
|
|
response.raise_for_status() |
|
|
|
|
|
result = response.json() |
|
|
|
|
|
print("Submission Successful!") |
|
|
final_status = ( |
|
|
f"Submission Successful!\n" |
|
|
f"User: {result.get('username')}\n" |
|
|
f"Overall Score: {result.get('score', 'N/A')}% " |
|
|
f"({result.get('correct_count', '?')}/{result.get('total_attempted', '?')} correct)\n" |
|
|
f"Message: {result.get('message', 'No message received.')}" |
|
|
) |
|
|
return final_status |
|
|
|
|
|
except requests.exceptions.HTTPError as e: |
|
|
error_detail = f"Server responded with status {e.response.status_code}." |
|
|
try: |
|
|
error_json = e.response.json() |
|
|
error_detail += f" Detail: {error_json.get('detail', e.response.text)}" |
|
|
except requests.exceptions.JSONDecodeError: |
|
|
error_detail += f" Response: {e.response.text[:500]}" |
|
|
status_message = f"Submission Failed: {error_detail}" |
|
|
print(status_message) |
|
|
return status_message |
|
|
except requests.exceptions.Timeout: |
|
|
status_message = "Submission Failed: The request timed out." |
|
|
print(status_message) |
|
|
return status_message |
|
|
except Exception as e: |
|
|
status_message = f"Error submitting results: {e}" |
|
|
print(status_message) |
|
|
return status_message |
|
|
|
|
|
def get_agent_code_url() -> str: |
|
|
"""Helper to determine the agent code URL.""" |
|
|
space_id = os.getenv("SPACE_ID") |
|
|
if YOUR_HF_SPACE_URL != DEFAULT_HF_SPACE_URL: |
|
|
return YOUR_HF_SPACE_URL |
|
|
elif space_id: |
|
|
return f"https://huggingface.co/spaces/{space_id}/tree/main" |
|
|
else: |
|
|
print("Warning: Could not determine HF Space URL. Using default.") |
|
|
return DEFAULT_HF_SPACE_URL |
|
|
|
|
|
|
|
|
|
|
|
def run_and_submit_all(profile: gr.OAuthProfile | None): |
|
|
""" |
|
|
Fetches all questions, runs the REAL agent on them, saves answers, |
|
|
submits all answers, and displays the results. |
|
|
(This is the modified function from app0-submit.py) |
|
|
""" |
|
|
if not profile: |
|
|
print("User not logged in.") |
|
|
return "Please Login to Hugging Face with the button.", None |
|
|
|
|
|
username = f"{profile.username}" |
|
|
print(f"User logged in: {username}") |
|
|
|
|
|
agent_code_url = get_agent_code_url() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
questions_data = get_questions() |
|
|
if not questions_data: |
|
|
print("Fetched questions list is empty.") |
|
|
return "Fetched questions list is empty or invalid format.", None |
|
|
print(f"Fetched {len(questions_data)} questions.") |
|
|
except Exception as e: |
|
|
print(f"Error fetching questions: {e}") |
|
|
return f"Error fetching questions: {e}", None |
|
|
|
|
|
|
|
|
try: |
|
|
agent = setup_agent() |
|
|
except Exception as e: |
|
|
print(f"Error instantiating agent: {e}") |
|
|
return f"Error initializing agent: {e} (Did you set HF_TOKEN secret?)", None |
|
|
|
|
|
|
|
|
results_log = [] |
|
|
all_answers = [] |
|
|
print(f"Running agent on {len(questions_data)} questions...") |
|
|
|
|
|
for item in questions_data: |
|
|
task_id = item.get("task_id") |
|
|
question_text = item.get("question") |
|
|
if not task_id or question_text is None: |
|
|
print(f"Skipping item with missing task_id or question: {item}") |
|
|
continue |
|
|
|
|
|
|
|
|
answer_dict = process_question(agent, item, all_answers) |
|
|
|
|
|
results_log.append({ |
|
|
"Task ID": task_id, |
|
|
"Question": question_text, |
|
|
"Submitted Answer": answer_dict.get("submitted_answer") |
|
|
}) |
|
|
|
|
|
if not all_answers: |
|
|
print("Agent did not produce any answers to submit.") |
|
|
return "Agent did to produce any answers to submit.", pd.DataFrame(results_log) |
|
|
|
|
|
|
|
|
status_update = f"Agent finished. Submitting {len(all_answers)} answers for user '{username}'..." |
|
|
print(status_update) |
|
|
|
|
|
final_status = submit_results(username, agent_code_url, all_answers) |
|
|
results_df = pd.DataFrame(results_log) |
|
|
return final_status, results_df |
|
|
|
|
|
def submit_from_file_gradio(profile: gr.OAuthProfile | None): |
|
|
""" |
|
|
Loads answers from ANSWERS_FILE and submits them. |
|
|
(This is the new function for the "submit only" button) |
|
|
""" |
|
|
if not profile: |
|
|
print("User not logged in.") |
|
|
return "Please Login to Hugging Face with the button.", None |
|
|
|
|
|
username = f"{profile.username}" |
|
|
print(f"User logged in: {username}") |
|
|
|
|
|
agent_code_url = get_agent_code_url() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print(f"\n--- Submitting Answers from {ANSWERS_FILE} ---") |
|
|
try: |
|
|
with open(ANSWERS_FILE, 'r') as f: |
|
|
answers = json.load(f) |
|
|
|
|
|
if not answers: |
|
|
print(f"No answers found in {ANSWERS_FILE}. Exiting.") |
|
|
return f"Error: No answers found in {ANSWERS_FILE}.", None |
|
|
|
|
|
print(f"Loaded {len(answers)} answers from {ANSWERS_FILE}.") |
|
|
|
|
|
|
|
|
final_status = submit_results(username, agent_code_url, answers) |
|
|
|
|
|
|
|
|
results_log = [] |
|
|
try: |
|
|
|
|
|
with open(QUESTIONS_FILE, 'r') as qf: |
|
|
questions = {q['task_id']: q['question'] for q in json.load(qf)} |
|
|
|
|
|
for ans in answers: |
|
|
results_log.append({ |
|
|
"Task ID": ans['task_id'], |
|
|
"Question": questions.get(ans['task_id'], "Question not found in cache"), |
|
|
"Submitted Answer": ans['submitted_answer'] |
|
|
}) |
|
|
results_df = pd.DataFrame(results_log) |
|
|
except Exception as e: |
|
|
print(f"Could not load questions file for display: {e}") |
|
|
results_df = pd.DataFrame(answers) |
|
|
|
|
|
return final_status, results_df |
|
|
|
|
|
except FileNotFoundError: |
|
|
error_msg = f"Error: The file '{ANSWERS_FILE}' was not found. Cannot submit." |
|
|
print(error_msg) |
|
|
return error_msg, None |
|
|
except Exception as e: |
|
|
error_msg = f"An error occurred while reading or submitting answers from file: {e}" |
|
|
print(error_msg) |
|
|
return error_msg, None |
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks() as demo: |
|
|
gr.Markdown("# Advanced Agent Evaluation Runner") |
|
|
gr.Markdown( |
|
|
""" |
|
|
**Instructions:** |
|
|
1. Log in to your Hugging Face account using the button below. |
|
|
2. Click **'Run Evaluation & Submit All Answers'** to: |
|
|
* Fetch all questions. |
|
|
* Run the full agent on every question (This will take a long time!). |
|
|
* Save answers to `answers.json`. |
|
|
* Submit all answers and get your score. |
|
|
3. Click **'Submit from answers.json (no re-run)'** to: |
|
|
* Load answers from the `answers.json` file (if it exists). |
|
|
* Submit those answers without re-running the agent. This is much faster. |
|
|
""" |
|
|
) |
|
|
|
|
|
gr.LoginButton() |
|
|
|
|
|
with gr.Row(): |
|
|
run_button = gr.Button("Run Evaluation & Submit All Answers") |
|
|
submit_file_button = gr.Button("Submit from answers.json (no re-run)") |
|
|
|
|
|
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) |
|
|
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) |
|
|
|
|
|
run_button.click( |
|
|
fn=run_and_submit_all, |
|
|
outputs=[status_output, results_table] |
|
|
) |
|
|
|
|
|
submit_file_button.click( |
|
|
fn=submit_from_file_gradio, |
|
|
outputs=[status_output, results_table] |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
print("\n" + "-"*30 + " App Starting " + "-"*30) |
|
|
|
|
|
|
|
|
space_id_startup = os.getenv("SPACE_ID") |
|
|
if space_id_startup: |
|
|
print(f"✅ SPACE_ID found: {space_id_startup}") |
|
|
print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") |
|
|
|
|
|
if YOUR_HF_SPACE_URL == DEFAULT_HF_SPACE_URL: |
|
|
YOUR_HF_SPACE_URL = f"https://huggingface.co/spaces/{space_id_startup}/tree/main" |
|
|
print(f" Set YOUR_HF_SPACE_URL to: {YOUR_HF_SPACE_URL}") |
|
|
else: |
|
|
print("ℹ️ SPACE_ID environment variable not found (running locally?).") |
|
|
if YOUR_HF_SPACE_URL == DEFAULT_HF_SPACE_URL: |
|
|
print("ERROR: `YOUR_HF_SPACE_URL` is not set.") |
|
|
print("Please set it in your .env file or as a script variable.") |
|
|
|
|
|
print(f"API_BASE_URL: {API_BASE_URL}") |
|
|
print(f"Using Agent Code URL: {YOUR_HF_SPACE_URL}") |
|
|
print("-"*(60 + len(" App Starting ")) + "\n") |
|
|
|
|
|
print("Launching Gradio Interface...") |
|
|
demo.launch(debug=True, share=False) |