Spaces:
Restarting
Restarting
| import argparse | |
| import json | |
| import os | |
| from datetime import datetime | |
| from geo_bot import GeoBot | |
| from benchmark import MapGuesserBenchmark | |
| from data_collector import DataCollector | |
| from config import MODELS_CONFIG, get_data_paths, SUCCESS_THRESHOLD_KM, get_model_class | |
| from collections import OrderedDict | |
| from tqdm import tqdm | |
| import matplotlib.pyplot as plt | |
| def agent_mode( | |
| model_name: str, | |
| steps: int, | |
| headless: bool, | |
| samples: int, | |
| dataset_name: str = "default", | |
| temperature: float = 0.0, | |
| ): | |
| """ | |
| Runs the AI Agent in a benchmark loop over multiple samples, | |
| using multi-step exploration for each. | |
| """ | |
| print( | |
| f"Starting Agent Mode: model={model_name}, steps={steps}, samples={samples}, dataset={dataset_name}, temperature={temperature}" | |
| ) | |
| data_paths = get_data_paths(dataset_name) | |
| try: | |
| with open(data_paths["golden_labels"], "r", encoding="utf-8") as f: | |
| golden_labels = json.load(f).get("samples", []) | |
| except FileNotFoundError: | |
| print( | |
| f"Error: Dataset '{dataset_name}' not found at {data_paths['golden_labels']}." | |
| ) | |
| return | |
| if not golden_labels: | |
| print(f"Error: No samples found in dataset '{dataset_name}'.") | |
| return | |
| num_to_test = min(samples, len(golden_labels)) | |
| test_samples = golden_labels[:num_to_test] | |
| print(f"Will run on {len(test_samples)} samples from dataset '{dataset_name}'.") | |
| config = MODELS_CONFIG.get(model_name) | |
| model_class = get_model_class(config["class"]) | |
| model_instance_name = config["model_name"] | |
| benchmark_helper = MapGuesserBenchmark(dataset_name=dataset_name, headless=True) | |
| all_results = [] | |
| with GeoBot( | |
| model=model_class, | |
| model_name=model_instance_name, | |
| headless=headless, | |
| temperature=temperature, | |
| ) as bot: | |
| for i, sample in enumerate(test_samples): | |
| print( | |
| f"\n--- Running Sample {i + 1}/{len(test_samples)} (ID: {sample.get('id')}) ---" | |
| ) | |
| if not bot.controller.load_location_from_data(sample): | |
| print( | |
| f" β Failed to load location for sample {sample.get('id')}. Skipping." | |
| ) | |
| continue | |
| bot.controller.setup_clean_environment() | |
| final_guess = bot.run_agent_loop(max_steps=steps) | |
| true_coords = {"lat": sample.get("lat"), "lng": sample.get("lng")} | |
| distance_km = None | |
| is_success = False | |
| if final_guess: | |
| distance_km = benchmark_helper.calculate_distance( | |
| true_coords, final_guess | |
| ) | |
| if distance_km is not None: | |
| is_success = distance_km <= SUCCESS_THRESHOLD_KM | |
| print(f"\nResult for Sample ID: {sample.get('id')}") | |
| print( | |
| f" Ground Truth: Lat={true_coords['lat']:.4f}, Lon={true_coords['lng']:.4f}" | |
| ) | |
| print( | |
| f" Final Guess: Lat={final_guess[0]:.4f}, Lon={final_guess[1]:.4f}" | |
| ) | |
| dist_str = f"{distance_km:.1f} km" if distance_km is not None else "N/A" | |
| print(f" Distance: {dist_str}, Success: {is_success}") | |
| else: | |
| print("Agent did not make a final guess for this sample.") | |
| all_results.append( | |
| { | |
| "sample_id": sample.get("id"), | |
| "model": bot.model_name, | |
| "true_coordinates": true_coords, | |
| "predicted_coordinates": final_guess, | |
| "distance_km": distance_km, | |
| "success": is_success, | |
| } | |
| ) | |
| summary = benchmark_helper.generate_summary(all_results) | |
| if summary: | |
| print( | |
| f"\n\n--- Agent Benchmark Complete for dataset '{dataset_name}'! Summary ---" | |
| ) | |
| for model, stats in summary.items(): | |
| print(f"Model: {model}") | |
| print(f" Success Rate: {stats['success_rate'] * 100:.1f}%") | |
| print(f" Avg Distance: {stats['average_distance_km']:.1f} km") | |
| print("Agent Mode finished.") | |
| def benchmark_mode( | |
| models: list, | |
| samples: int, | |
| headless: bool, | |
| dataset_name: str = "default", | |
| temperature: float = 0.0, | |
| ): | |
| """Runs the benchmark on pre-collected data.""" | |
| print( | |
| f"Starting Benchmark Mode: models={models}, samples={samples}, dataset={dataset_name}, temperature={temperature}" | |
| ) | |
| benchmark = MapGuesserBenchmark(dataset_name=dataset_name, headless=headless) | |
| summary = benchmark.run_benchmark( | |
| models=models, max_samples=samples, temperature=temperature | |
| ) | |
| if summary: | |
| print(f"\n--- Benchmark Complete for dataset '{dataset_name}'! Summary ---") | |
| for model, stats in summary.items(): | |
| print(f"Model: {model}") | |
| print(f" Success Rate: {stats['success_rate'] * 100:.1f}%") | |
| print(f" Avg Distance: {stats['average_distance_km']:.1f} km") | |
| def collect_mode(dataset_name: str, samples: int, headless: bool): | |
| """Collects data for a new dataset.""" | |
| print(f"Starting Data Collection: dataset={dataset_name}, samples={samples}") | |
| with DataCollector(dataset_name=dataset_name, headless=headless) as collector: | |
| collector.collect_samples(num_samples=samples) | |
| print(f"Data collection complete for dataset '{dataset_name}'.") | |
| def test_mode( | |
| models: list, | |
| samples: int, | |
| runs: int, | |
| steps: int, | |
| dataset_name: str = "default", | |
| temperature: float = 0.0, | |
| headless: bool = True, | |
| ): | |
| """ | |
| CLI multi-model / multi-run benchmark. | |
| For each model: | |
| β’ run N times | |
| β’ each run evaluates `samples` images | |
| β’ record hit-rate per step and average distance | |
| """ | |
| # ---------- load dataset ---------- | |
| data_paths = get_data_paths(dataset_name) | |
| try: | |
| with open(data_paths["golden_labels"], "r", encoding="utf-8") as f: | |
| all_samples = json.load(f)["samples"] | |
| except FileNotFoundError: | |
| print(f"β dataset '{dataset_name}' not found.") | |
| return | |
| if not all_samples: | |
| print("β dataset is empty.") | |
| return | |
| test_samples = all_samples[:samples] | |
| print(f"π loaded {len(test_samples)} samples from '{dataset_name}'") | |
| benchmark_helper = MapGuesserBenchmark(dataset_name=dataset_name, headless=headless) | |
| summary_by_step: dict[str, list[float]] = OrderedDict() | |
| avg_distances: dict[str, float] = {} | |
| time_tag = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| base_dir = os.path.join("./results", "test", time_tag) | |
| os.makedirs(base_dir, exist_ok=True) | |
| # ---------- iterate over models ---------- | |
| for model_name in models: | |
| log_json={} | |
| print(f"\n===== {model_name} =====") | |
| cfg = MODELS_CONFIG[model_name] | |
| model_cls = get_model_class(cfg["class"]) | |
| hits_per_step = [0] * steps | |
| distance_per_step = [0.0] * steps | |
| total_iterations = runs * len(test_samples) | |
| with tqdm(total=total_iterations, desc=model_name) as pbar: | |
| for _ in range(runs): | |
| with GeoBot( | |
| model=model_cls, | |
| model_name=cfg["model_name"], | |
| headless=headless, | |
| temperature=temperature, | |
| ) as bot: | |
| for sample in test_samples: | |
| if not bot.controller.load_location_from_data(sample): | |
| pbar.update(1) | |
| continue | |
| preds = bot.test_run_agent_loop(max_steps=steps) | |
| gt = {"lat": sample["lat"], "lng": sample["lng"]} | |
| if sample["id"] not in log_json: | |
| log_json[sample["id"]] = [] | |
| for idx, pred in enumerate(preds): | |
| if isinstance(pred, dict) and "lat" in pred: | |
| dist = benchmark_helper.calculate_distance( | |
| gt, (pred["lat"], pred["lon"]) | |
| ) | |
| if dist is not None: | |
| distance_per_step[idx] += dist | |
| preds[idx]["distance"] = dist | |
| if dist <= SUCCESS_THRESHOLD_KM: | |
| hits_per_step[idx] += 1 | |
| preds[idx]["success"] = True | |
| else: | |
| preds[idx]["success"] = False | |
| log_json[sample["id"]].append({ | |
| "run_id": _, | |
| "predictions": preds, | |
| }) | |
| pbar.update(1) | |
| os.makedirs(f"{base_dir}/{model_name}", exist_ok=True) | |
| with open(f"{base_dir}/{model_name}/{model_name}_log.json", "w") as f: | |
| json.dump(log_json, f, indent=2) | |
| denom = runs * len(test_samples) | |
| summary_by_step[model_name] = [h / denom for h in hits_per_step] | |
| avg_distances[model_name] = [d / denom for d in distance_per_step] | |
| payload = { | |
| "avg_distance_km": avg_distances[model_name], | |
| "accuracy_per_step": summary_by_step[model_name] | |
| } | |
| with open(f"{base_dir}/{model_name}/{model_name}.json", "w") as f: | |
| json.dump(payload, f, indent=2) | |
| print(f"πΎ results saved to {base_dir}") | |
| # ---------- pretty table ---------- | |
| header = ["Step"] + list(summary_by_step.keys()) | |
| row_width = max(len(h) for h in header) + 2 | |
| print("\n=== ACCURACY PER STEP ===") | |
| print(" | ".join(h.center(row_width) for h in header)) | |
| print("-" * (row_width + 3) * len(header)) | |
| for i in range(steps): | |
| cells = [str(i + 1).center(row_width)] | |
| for m in summary_by_step: | |
| cells.append(f"{summary_by_step[m][i]*100:5.1f}%".center(row_width)) | |
| print(" | ".join(cells)) | |
| print("\n=== AVG DISTANCE PER STEP (km) ===") | |
| header = ["Step"] + list(avg_distances.keys()) | |
| row_w = max(len(h) for h in header) + 2 | |
| print(" | ".join(h.center(row_w) for h in header)) | |
| print("-" * (row_w + 3) * len(header)) | |
| for i in range(steps): | |
| cells = [str(i+1).center(row_w)] | |
| for m in avg_distances: | |
| v = avg_distances[m][i] | |
| cells.append(f"{v:6.1f}" if v is not None else " N/A ".center(row_w)) | |
| print(" | ".join(cells)) | |
| try: | |
| for model, acc in summary_by_step.items(): | |
| plt.plot(range(1, steps + 1), acc, marker="o", label=model) | |
| plt.xlabel("step") | |
| plt.ylabel("accuracy") | |
| plt.ylim(0, 1) | |
| plt.legend() | |
| plt.grid(True, alpha=0.3) | |
| plt.title("Accuracy vs Step") | |
| plt.savefig(f"{base_dir}/accuracy_step.png", dpi=120) | |
| print("\nπ saved plot to accuracy_step.png") | |
| # Plot average distance per model | |
| plt.figure() | |
| for model, acc in avg_distances.items(): | |
| plt.plot(range(1, steps + 1), acc, marker="o", label=model) | |
| plt.xlabel("step") | |
| plt.ylabel("Avg Distance (km)") | |
| plt.title("Average Distance per Model") | |
| plt.xticks(rotation=45, ha="right") | |
| plt.tight_layout() | |
| plt.savefig(f"{base_dir}/avg_distance.png", dpi=120) | |
| print("π saved plot to avg_distance.png") | |
| except Exception as e: | |
| print(f"β οΈ plot skipped: {e}") | |
| def main(): | |
| parser = argparse.ArgumentParser(description="MapCrunch AI Agent & Benchmark") | |
| parser.add_argument( | |
| "--mode", | |
| choices=["agent", "benchmark", "collect", "test"], | |
| default="agent", | |
| help="Operation mode.", | |
| ) | |
| parser.add_argument( | |
| "--dataset", | |
| default="default", | |
| help="Dataset name to use or create.", | |
| ) | |
| parser.add_argument( | |
| "--model", | |
| choices=list(MODELS_CONFIG.keys()), | |
| default="gpt-4o", | |
| help="Model to use.", | |
| ) | |
| parser.add_argument( | |
| "--steps", type=int, default=10, help="[Agent] Number of exploration steps." | |
| ) | |
| parser.add_argument( | |
| "--samples", | |
| type=int, | |
| default=50, | |
| help="Number of samples to process for the selected mode.", | |
| ) | |
| parser.add_argument( | |
| "--headless", action="store_true", help="Run browser in headless mode." | |
| ) | |
| parser.add_argument( | |
| "--models", | |
| nargs="+", | |
| choices=list(MODELS_CONFIG.keys()), | |
| help="[Benchmark] Models to benchmark.", | |
| ) | |
| parser.add_argument( | |
| "--temperature", | |
| type=float, | |
| default=0.0, | |
| help="Temperature parameter for LLM sampling (0.0 = deterministic, higher = more random). Default: 0.0", | |
| ) | |
| parser.add_argument("--runs", type=int, default=3, help="[Test] Runs per model") | |
| args = parser.parse_args() | |
| if args.mode == "collect": | |
| collect_mode( | |
| dataset_name=args.dataset, | |
| samples=args.samples, | |
| headless=args.headless, | |
| ) | |
| elif args.mode == "agent": | |
| agent_mode( | |
| model_name=args.model, | |
| steps=args.steps, | |
| headless=args.headless, | |
| samples=args.samples, | |
| dataset_name=args.dataset, | |
| temperature=args.temperature, | |
| ) | |
| elif args.mode == "benchmark": | |
| benchmark_mode( | |
| models=args.models or [args.model], | |
| samples=args.samples, | |
| headless=args.headless, | |
| dataset_name=args.dataset, | |
| temperature=args.temperature, | |
| ) | |
| elif args.mode == "test": | |
| test_mode( | |
| models=args.models or [args.model], | |
| samples=args.samples, | |
| runs=args.runs, | |
| steps=args.steps, | |
| dataset_name=args.dataset, | |
| temperature=args.temperature, | |
| headless=args.headless, | |
| ) | |
| if __name__ == "__main__": | |
| main() | |