Spaces:
Sleeping
Sleeping
| import os | |
| import gradio as gr | |
| import requests | |
| import pandas as pd | |
| # Ensure basic_agent.py is in the same directory | |
| from basic_agent import BasicAgent | |
| import json | |
| import tempfile | |
| # --- Constants --- | |
| DEFAULT_API_URL = os.getenv( | |
| "API_URL", "https://agents-course-unit4-scoring.hf.space") | |
| QUESTIONS_URL = f"{DEFAULT_API_URL}/questions" | |
| SUBMIT_URL = f"{DEFAULT_API_URL}/submit" | |
| PLACEHOLDER_UNATTEMPTED = "_NOT_ATTEMPTED_" | |
| # --- Agent Instantiation Helper --- | |
| def get_agent_instance(): | |
| try: | |
| return BasicAgent() | |
| except Exception as e: | |
| print(f"Error instantiating agent: {e}") | |
| gr.Warning(f"Error initializing agent: {e}") | |
| return None | |
| # --- Original run_and_submit_all function --- | |
| def run_and_submit_all(profile: gr.OAuthProfile | None): | |
| space_id = os.getenv("SPACE_ID") | |
| if not profile: | |
| gr.Warning("Please Login first.") | |
| return "Login required.", pd.DataFrame() | |
| username = profile.username | |
| print(f"User logged in: {username}") | |
| agent = get_agent_instance() | |
| if not agent: | |
| return "Failed to initialize agent.", pd.DataFrame() | |
| agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" if space_id else "local_run" | |
| print(f"Fetching questions from: {QUESTIONS_URL}") | |
| try: | |
| response = requests.get(QUESTIONS_URL, timeout=15) | |
| response.raise_for_status() | |
| questions_data = response.json() | |
| if not questions_data: | |
| return "Fetched questions list is empty.", pd.DataFrame() | |
| print(f"Fetched {len(questions_data)} questions.") | |
| except Exception as e: | |
| return f"Error fetching/decoding questions: {e}", pd.DataFrame() | |
| results_log = [] | |
| answers_payload = [] | |
| print(f"Running agent on all {len(questions_data)} questions...") | |
| for item in questions_data: | |
| task_id, q_text = item.get("task_id"), item.get("question") | |
| if not task_id or q_text is None: | |
| print(f"Skipping item: {item}") | |
| continue | |
| try: | |
| print(f"Running agent for Task ID {task_id}...") | |
| submitted_answer = agent(task_id, q_text) | |
| answers_payload.append( | |
| {"task_id": task_id, "submitted_answer": submitted_answer}) | |
| results_log.append( | |
| {"Task ID": task_id, "Question": q_text, "Submitted Answer": submitted_answer}) | |
| except Exception as e: | |
| results_log.append( | |
| {"Task ID": task_id, "Question": q_text, "Submitted Answer": f"AGENT ERROR: {e}"}) | |
| results_df = pd.DataFrame(results_log, columns=[ | |
| "Task ID", "Question", "Submitted Answer"]) # Ensure column order | |
| if not answers_payload: | |
| return "Agent produced no answers.", results_df | |
| submission_data = {"username": username.strip( | |
| ), "agent_code": agent_code, "answers": answers_payload} | |
| print(f"Submitting {len(answers_payload)} answers to: {SUBMIT_URL}") | |
| print("Submitting data:", json.dumps(submission_data, indent=2)) | |
| try: | |
| response = requests.post( | |
| SUBMIT_URL, json=submission_data, timeout=max(60, len(answers_payload) * 2)) | |
| response.raise_for_status() | |
| result_data = response.json() | |
| return (f"Submission Successful! User: {result_data.get('username')}, " | |
| f"Score: {result_data.get('score', 'N/A')}% ({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')}), " | |
| f"Msg: {result_data.get('message', '')}"), results_df | |
| except Exception as e: | |
| return f"Submission Failed: {e}", results_df | |
| # --- Step-by-Step Action Functions --- | |
| def load_questions_action(profile: gr.OAuthProfile | None): | |
| if not profile: | |
| gr.Warning("Please Login first.") | |
| return "Login required.", [], pd.DataFrame(), None | |
| print(f"Fetching questions for {profile.username} from: {QUESTIONS_URL}") | |
| try: | |
| response = requests.get(QUESTIONS_URL, timeout=15) | |
| response.raise_for_status() | |
| questions_server_data = response.json() | |
| if not questions_server_data: | |
| return "Fetched questions list is empty.", [], pd.DataFrame(), None | |
| new_results_log = [ | |
| {"Task ID": q.get("task_id"), "Question": q.get( | |
| "question"), "Submitted Answer": PLACEHOLDER_UNATTEMPTED} | |
| for q in questions_server_data if q.get("task_id") and q.get("question") is not None | |
| ] | |
| msg = f"Fetched {len(new_results_log)} questions. Progress reset." | |
| gr.Info(msg) | |
| return ( | |
| msg, | |
| # For results_log_list_state (this is the single source of truth now) | |
| new_results_log, | |
| pd.DataFrame(new_results_log, columns=[ | |
| "Task ID", "Question", "Submitted Answer"]), # For results_display_table | |
| None # For q_number_input (reset selection) | |
| ) | |
| except Exception as e: | |
| msg = f"Error fetching questions: {e}" | |
| gr.Error(msg) | |
| return msg, [], pd.DataFrame(), None | |
| def run_single_question_action(profile: gr.OAuthProfile | None, q_idx: int | None, current_results_log: list): | |
| if not profile: | |
| gr.Warning("Please Login first.") | |
| return "Login required.", current_results_log, pd.DataFrame(current_results_log) | |
| # current_results_log is results_log_list_state, which has 'Task ID', 'Question', 'Submitted Answer' | |
| if not current_results_log: | |
| gr.Warning("No questions loaded.") | |
| return "No questions loaded.", current_results_log, pd.DataFrame(current_results_log) | |
| if q_idx is None: | |
| gr.Warning("Select question or enter index.") | |
| return "Invalid index.", current_results_log, pd.DataFrame(current_results_log) | |
| if not 0 <= q_idx < len(current_results_log): | |
| return f"Index {q_idx} out of bounds.", current_results_log, pd.DataFrame(current_results_log) | |
| agent = get_agent_instance() | |
| if not agent: | |
| return "Agent init failed.", current_results_log, pd.DataFrame(current_results_log) | |
| # Get question details from the selected row in current_results_log | |
| item_to_process = current_results_log[q_idx] | |
| task_id, q_text = item_to_process.get( | |
| "Task ID"), item_to_process.get("Question") | |
| if not task_id or q_text is None: | |
| return f"Invalid question data at index {q_idx}.", current_results_log, pd.DataFrame(current_results_log) | |
| print(f"Running for Task ID {task_id} (Index {q_idx}): {q_text[:50]}...") | |
| try: | |
| submitted_answer = agent(task_id, q_text) | |
| status_msg = f"Successfully processed Task ID {task_id}." | |
| except Exception as e: | |
| submitted_answer = f"AGENT ERROR: {e}" | |
| status_msg = f"Error on task {task_id}: {e}" | |
| gr.Error(status_msg) | |
| updated_results_log = list(current_results_log) # Make a mutable copy | |
| updated_results_log[q_idx] = { | |
| "Task ID": task_id, "Question": q_text, "Submitted Answer": submitted_answer} | |
| gr.Info(status_msg if "AGENT ERROR" not in submitted_answer else "Agent run finished with error.") | |
| return status_msg, updated_results_log, pd.DataFrame(updated_results_log, columns=["Task ID", "Question", "Submitted Answer"]) | |
| def download_progress_action(results_log_list: list): | |
| if not results_log_list: | |
| gr.Info("No progress to download.") | |
| return None | |
| try: | |
| with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json", encoding='utf-8') as tmpfile: | |
| json.dump(results_log_list, tmpfile, indent=2) | |
| gr.Info("Progress file ready.") | |
| return gr.File(value=tmpfile.name, label="progress.json") | |
| except Exception as e: | |
| gr.Error(f"Error preparing download: {e}") | |
| return None | |
| def load_progress_action(uploaded_file_obj): | |
| if uploaded_file_obj is None: | |
| gr.Warning("No file uploaded.") | |
| return "No file.", [], pd.DataFrame(), None | |
| try: | |
| with open(uploaded_file_obj.name, "r", encoding='utf-8') as f: | |
| loaded_data = json.load(f) | |
| if not isinstance(loaded_data, list) or \ | |
| not all(isinstance(item, dict) and all(k in item for k in ["Task ID", "Question", "Submitted Answer"]) for item in loaded_data): | |
| raise ValueError( | |
| "Invalid file format. Expects list of {'Task ID': ..., 'Question': ..., 'Submitted Answer': ...}") | |
| new_results_log_list = loaded_data | |
| msg = f"Loaded {len(new_results_log_list)} entries from file." | |
| gr.Info(msg) | |
| return ( | |
| msg, | |
| new_results_log_list, | |
| pd.DataFrame(new_results_log_list, columns=[ | |
| "Task ID", "Question", "Submitted Answer"]), | |
| None # Reset selected index | |
| ) | |
| except Exception as e: | |
| msg = f"Error loading progress: {e}" | |
| gr.Error(msg) | |
| return msg, [], pd.DataFrame(), None | |
| def submit_current_results_action(profile: gr.OAuthProfile | None, results_log_list: list): | |
| if not profile: | |
| gr.Warning("Please Login first.") | |
| return "Login required." | |
| username = profile.username | |
| if not results_log_list: | |
| return "No results to submit." | |
| space_id = os.getenv("SPACE_ID") | |
| agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" if space_id else "local_run" | |
| answers_payload = [ | |
| {"task_id": e["Task ID"], "submitted_answer": e["Submitted Answer"]} | |
| for e in results_log_list | |
| if e["Submitted Answer"] != PLACEHOLDER_UNATTEMPTED and "AGENT ERROR" not in str(e.get("Submitted Answer", "")) | |
| ] | |
| if not answers_payload: | |
| return "No attempted (non-error) answers to submit." | |
| submission_data = {"username": username.strip( | |
| ), "agent_code": agent_code, "answers": answers_payload} | |
| gr.Info(f"Submitting {len(answers_payload)} answers for '{username}'...") | |
| print("Submitting data:", json.dumps(submission_data, indent=2)) | |
| try: | |
| response = requests.post( | |
| SUBMIT_URL, json=submission_data, timeout=max(60, len(answers_payload)*2)) | |
| response.raise_for_status() | |
| result_data = response.json() | |
| return (f"Submission Successful! User: {result_data.get('username')}, Score: {result_data.get('score', 'N/A')}% " | |
| f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')}), Msg: {result_data.get('message', '')}") | |
| except requests.exceptions.HTTPError as e: | |
| error_detail = f"Server responded with status {e.response.status_code}." | |
| try: | |
| error_json = e.response.json() # This is key | |
| error_detail += f" Detail: {error_json.get('detail', e.response.text if e.response else 'No response text')}" | |
| except requests.exceptions.JSONDecodeError: | |
| error_detail += f" Response: {e.response.text[:500] if e.response else 'No response text'}" | |
| status_message = f"Submission Failed: {error_detail}" | |
| gr.Error(status_message) | |
| return status_message | |
| # --- Build Gradio Interface --- | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# Enhanced Agent Evaluation Runner") | |
| # ... Instructions markdown ... | |
| # Single source of truth for the state of all questions and their answers | |
| results_log_list_state = gr.State([]) | |
| gr.LoginButton() | |
| with gr.Tabs(): | |
| with gr.TabItem("Step-by-Step Evaluation"): | |
| gr.Markdown("## Evaluation Workflow") | |
| with gr.Row(): | |
| load_questions_button = gr.Button( | |
| "1. Load Questions from Server", variant="secondary") | |
| load_q_status = gr.Textbox( | |
| label="Load Status", interactive=False, lines=1) | |
| gr.Markdown("### 2. Select a Question and Run Agent") | |
| # This table is now the main display for questions and answers | |
| results_display_table = gr.DataFrame( | |
| label="Questions & Answers (Select row to run agent)", | |
| headers=["Task ID", "Question", "Submitted Answer"], | |
| row_count=10, | |
| wrap=True, | |
| interactive=True # Allows row selection | |
| ) | |
| with gr.Row(): | |
| q_number_input = gr.Number( | |
| label="Selected Question Index", minimum=0, precision=0, step=1, value=None, interactive=True) | |
| run_single_q_button = gr.Button( | |
| "Run Agent for Selected Index", variant="primary") | |
| single_q_status = gr.Textbox( | |
| label="Run Single Status", interactive=False, lines=1) | |
| with gr.Accordion("3. Manage Full Progress (Download/Upload)", open=False): | |
| download_file_output = gr.File( | |
| label="Download Link", interactive=False) | |
| download_button = gr.Button("Download All Progress") | |
| with gr.Row(): | |
| upload_file_input = gr.File( | |
| label="Upload Progress File (JSON)", type="filepath", file_types=[".json"]) | |
| load_progress_button = gr.Button("Load Uploaded File") | |
| upload_status = gr.Textbox( | |
| label="Upload Status", interactive=False, lines=1) | |
| gr.Markdown("### 4. Submit Results") | |
| submit_step_by_step_button = gr.Button( | |
| "Submit Attempted Answers", variant="primary") | |
| submit_sbs_status = gr.Textbox( | |
| label="Submission Status", lines=3, interactive=False) | |
| with gr.TabItem("Run All & Submit (Original Batch)"): | |
| gr.Markdown("## Original Batch Runner") | |
| original_run_button = gr.Button( | |
| "Run All Questions & Submit", variant="primary") | |
| original_status_output = gr.Textbox( | |
| label="Batch Run Status / Result", lines=3, interactive=False) | |
| original_results_table = gr.DataFrame(label="Batch Run Q&A", wrap=True, interactive=False, headers=[ | |
| "Task ID", "Question", "Submitted Answer"]) | |
| # --- Wire up Step-by-Step controls --- | |
| load_questions_button.click( | |
| fn=load_questions_action, inputs=[], | |
| outputs=[load_q_status, results_log_list_state, | |
| results_display_table, q_number_input] | |
| ) | |
| def handle_select_question_from_results_table(evt: gr.SelectData): | |
| if evt.index is not None: | |
| # evt.index should be the row index (int) for single row selection | |
| # If it's a tuple (row, col) for cell selection, take index[0] | |
| if isinstance(evt.index, tuple): | |
| return evt.index[0] | |
| elif isinstance(evt.index, int): | |
| return evt.index | |
| # Handle list for multi-select if it were enabled (take first) | |
| elif isinstance(evt.index, list) and evt.index: | |
| return evt.index[0] | |
| return None # No change or clear if no valid selection | |
| results_display_table.select( | |
| fn=handle_select_question_from_results_table, inputs=None, outputs=[q_number_input], show_progress="hidden" | |
| ) | |
| run_single_q_button.click( | |
| fn=run_single_question_action, | |
| inputs=[q_number_input, results_log_list_state], | |
| outputs=[single_q_status, results_log_list_state, results_display_table] | |
| ) | |
| download_button.click(download_progress_action, [ | |
| results_log_list_state], [download_file_output]) | |
| load_progress_button.click( | |
| load_progress_action, [upload_file_input], | |
| [upload_status, results_log_list_state, | |
| results_display_table, q_number_input] | |
| ) | |
| submit_step_by_step_button.click( | |
| submit_current_results_action, [ | |
| results_log_list_state], [submit_sbs_status] | |
| ) | |
| original_run_button.click(run_and_submit_all, [], [ | |
| original_status_output, original_results_table]) | |
| if __name__ == "__main__": | |
| print("\n" + "-"*30 + " App Starting " + "-"*30) | |
| space_host_startup = os.getenv("SPACE_HOST") | |
| space_id_startup = os.getenv("SPACE_ID") | |
| if space_host_startup: | |
| print( | |
| f"✅ SPACE_HOST: {space_host_startup}, URL: https://{space_host_startup}.hf.space") | |
| else: | |
| print("ℹ️ SPACE_HOST not found (local run?).") | |
| if space_id_startup: | |
| print( | |
| f"✅ SPACE_ID: {space_id_startup}, Repo: https://huggingface.co/spaces/{space_id_startup}") | |
| else: | |
| print("ℹ️ SPACE_ID not found. Repo URL cannot be determined.") | |
| print("-"*(60 + len(" App Starting ")) + "\n") | |
| demo.launch(debug=True) | |