import os import random import json import uuid import time from datetime import datetime, timedelta, timezone from typing import Dict, Any, List, Optional import spaces import requests from dotenv import load_dotenv import gradio as gr from gradio.components import LoginButton import data_manager from huggingface_hub import HfApi, hf_hub_download, whoami from transformers import Mistral3ForConditionalGeneration, AutoTokenizer, TextIteratorStreamer import threading import torch load_dotenv() APP_SECRET = os.urandom(24) ZONES_FILE = 'zones.json' zones = { "easy": [], "medium": [], "hard": [] } user_sessions: Dict[str, Dict[str, Any]] = {} DEFAULT_USERNAME = "player" def save_zones_to_file(): with open(ZONES_FILE, 'w') as f: json.dump(zones, f, indent=4) def load_zones_from_file(): global zones if os.path.exists(ZONES_FILE): try: with open(ZONES_FILE, 'r') as f: loaded_zones = json.load(f) if not (isinstance(loaded_zones, dict) and all(k in loaded_zones for k in ["easy", "medium", "hard"])): raise ValueError("Invalid format") migrated = False for difficulty in loaded_zones: for zone in loaded_zones[difficulty]: if 'id' not in zone: zone['id'] = uuid.uuid4().hex migrated = True zones = loaded_zones print(zones) if migrated: print("Info: Migrated old zone data by adding unique IDs.") save_zones_to_file() except (json.JSONDecodeError, IOError, ValueError): print(f"Warning: '{ZONES_FILE}' is corrupted or invalid. Recreating with empty zones.") save_zones_to_file() else: save_zones_to_file() LOCATIONS = [ {'lat': 48.85824, 'lng': 2.2945}, {'lat': 40.748440, 'lng': -73.985664}, {'lat': 35.689487, 'lng': 139.691711}, {'lat': -33.856784, 'lng': 151.215297} ] def generate_id(): return ''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=10)) HF_DATASET_REPO = 'Jofthomas/geoguessr_game_of_the_day' GOOGLE_MAPS_API_KEY = os.getenv('GOOGLE_MAPS_API_KEY') SERVER_HF_TOKEN = os.getenv('HF_TOKEN', '') model_id = "mistralai/Magistral-Small-2509" tokenizer = AutoTokenizer.from_pretrained(model_id, tokenizer_type="mistral", use_fast=False) model = Mistral3ForConditionalGeneration.from_pretrained( model_id, torch_dtype=torch.bfloat16, device_map="auto" ).eval() # SYSTEM_PROMPT_TEXT = ( # "You are a world-class geolocation expert. Given a street-view style image, " # "think step by step about visual clues and infer approximate coordinates. " # "When you conclude, output your answer inside [ANSWER]lat,lng[/ANSWER]." # ) SYSTEM_PROMPT_TEXT = """First draft your thinking process (inner monologue) until you arrive at a response. Format your response using Markdown, and use LaTeX for any mathematical equations. Write both your thoughts and the response in the same language as the input. Your thinking process must follow the template below:[THINK]Your thoughts or/and draft, like working through an exercise on scratch paper. Be as casual and as long as you want until you are confident to generate the response to the user.[/THINK]Here, provide a self-contained response.""" USER_INSTRUCTION = """You are a world-class geolocation expert. Given a street-view style image, think step by step about visual clues and infer approximate coordinates. When you conclude, output your final answer inside [ANSWER]lat,lng[/ANSWER]. Please analyze this image and provide coordinates in the required format.""" @spaces.GPU(duration=120) def llm_decode_image_return_text(image_bytes: bytes) -> str: print(f"[llm] decode start. image_bytes={len(image_bytes)} bytes") import base64, mimetypes try: encoded_image = base64.b64encode(image_bytes).decode('utf-8') mime_type = 'image/jpeg' data_url = f"data:{mime_type};base64,{encoded_image}" messages = [ {"role": "system", "content": [{"type": "text", "text": SYSTEM_PROMPT_TEXT}]}, {"role": "user", "content": [ {"type": "text", "text": USER_INSTRUCTION}, {"type": "image_url", "image_url": {"url": data_url}}, ]}, ] print(f"[llm] messages prepared. system+user with image_url length={len(data_url)}") tokenized = tokenizer.apply_chat_template(messages, return_dict=True) print(f"[llm] tokenized keys={list(tokenized.keys())}") import torch input_ids = torch.tensor(tokenized.input_ids).unsqueeze(0) attention_mask = torch.tensor(tokenized.attention_mask).unsqueeze(0) print(f"[llm] input_ids shape={tuple(input_ids.shape)} attn_mask shape={tuple(attention_mask.shape)} device={model.device}") kwargs = { 'input_ids': input_ids.to(model.device), 'attention_mask': attention_mask.to(model.device), } if 'pixel_values' in tokenized and len(tokenized.pixel_values) > 0: pixel_values = torch.tensor(tokenized.pixel_values[0], dtype=model.dtype).unsqueeze(0).to(model.device) image_sizes = torch.tensor(pixel_values.shape[-2:]).unsqueeze(0).to(model.device) kwargs.update({'pixel_values': pixel_values, 'image_sizes': image_sizes}) print(f"[llm] pixel_values shape={tuple(pixel_values.shape)} image_sizes={tuple(image_sizes.shape)}") output = model.generate(**kwargs)[0] print(f"[llm] generate done. output length={len(output)}") decoded = tokenizer.decode(output[len(tokenized.input_ids): ( -1 if output[-1] == tokenizer.eos_token_id else len(output) )]) print(f"[llm] decode done. text length={len(decoded)}") return decoded except Exception as e: print(f"[llm] decode failed: {e}") return f"[Error] {e}" @spaces.GPU(duration=120) def llm_stream_image_text(image_bytes: bytes): print(f"[llm-stream] start. image_bytes={len(image_bytes)} bytes") import base64 try: encoded_image = base64.b64encode(image_bytes).decode('utf-8') data_url = f"data:image/jpeg;base64,{encoded_image}" messages = [ {"role": "system", "content": [{"type": "text", "text": SYSTEM_PROMPT_TEXT}]}, {"role": "user", "content": [ {"type": "text", "text": USER_INSTRUCTION}, {"type": "image_url", "image_url": {"url": data_url}}, ]}, ] tokenized = tokenizer.apply_chat_template(messages, return_dict=True) input_ids = torch.tensor(tokenized.input_ids).unsqueeze(0) attention_mask = torch.tensor(tokenized.attention_mask).unsqueeze(0) kwargs = { 'input_ids': input_ids.to(model.device), 'attention_mask': attention_mask.to(model.device), 'max_new_tokens': 80_000, } if 'pixel_values' in tokenized and len(tokenized.pixel_values) > 0: pixel_values = torch.tensor(tokenized.pixel_values[0], dtype=model.dtype).unsqueeze(0).to(model.device) image_sizes = torch.tensor(pixel_values.shape[-2:]).unsqueeze(0).to(model.device) kwargs.update({'pixel_values': pixel_values, 'image_sizes': image_sizes}) streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=False) kwargs['streamer'] = streamer thread = threading.Thread(target=model.generate, kwargs=kwargs) thread.start() acc = "" for new_text in streamer: acc += new_text yield acc except Exception as e: yield f"[Error] {e}" def pick_random_location(difficulty: str) -> Dict[str, float]: candidates = zones.get(difficulty, []) if candidates: selected_zone = random.choice(candidates) if selected_zone.get('type') == 'rectangle': b = selected_zone['bounds'] north, south, east, west = b['north'], b['south'], b['east'], b['west'] if west > east: east += 360 lng = random.uniform(west, east) if lng > 180: lng -= 360 lat = random.uniform(south, north) ensured = _ensure_street_view_location(lat, lng) if ensured: return ensured fallback = random.choice(LOCATIONS) ensured_fallback = _ensure_street_view_location(fallback['lat'], fallback['lng']) return ensured_fallback or fallback def street_view_image_url(lat: float, lng: float) -> str: if not GOOGLE_MAPS_API_KEY: # Fallback placeholder to avoid blank image when key is missing return "https://picsum.photos/1200/800" return ( f"https://maps.googleapis.com/maps/api/streetview?size=1200x800&location={lat},{lng}&fov=60&pitch=0&source=outdoor&key={GOOGLE_MAPS_API_KEY}" ) def _has_street_view(lat: float, lng: float) -> bool: if not GOOGLE_MAPS_API_KEY: return True try: resp = requests.get( "https://maps.googleapis.com/maps/api/streetview/metadata", params={"location": f"{lat},{lng}", "source": "outdoor", "key": GOOGLE_MAPS_API_KEY}, timeout=5, ) resp.raise_for_status() data = resp.json() # Check if it's OK and preferably outdoor (not inside buildings) if data.get("status") == "OK": # Prefer locations that are explicitly outdoor location_type = data.get("location_type") # If location_type is available, check it's not indoors if location_type and location_type == "INDOOR": return False return True return False except Exception: return False def _snap_to_nearest_road(lat: float, lng: float) -> Optional[Dict[str, float]]: if not GOOGLE_MAPS_API_KEY: return None try: resp = requests.get( "https://roads.googleapis.com/v1/nearestRoads", params={"points": f"{lat},{lng}", "key": GOOGLE_MAPS_API_KEY}, timeout=5, ) resp.raise_for_status() data = resp.json() points = data.get("snappedPoints") or [] if not points: return None loc = points[0].get("location") or {} if "latitude" in loc and "longitude" in loc: return {"lat": float(loc["latitude"]), "lng": float(loc["longitude"])} except Exception: pass return None def _ensure_street_view_location(lat: float, lng: float) -> Optional[Dict[str, float]]: """Return a coordinate with confirmed Street View coverage, snapped near a road when possible.""" if not GOOGLE_MAPS_API_KEY: return {"lat": lat, "lng": lng} checked: set[tuple] = set() snapped = _snap_to_nearest_road(lat, lng) candidates: List[Dict[str, float]] = [] if snapped: candidates.append(snapped) candidates.append({"lat": lat, "lng": lng}) # Explore a few jittered points if needed if not snapped: increments = [0.0005, -0.0005, 0.001, -0.001] for d_lat in increments: for d_lng in increments: if d_lat == 0 and d_lng == 0: continue candidates.append({"lat": lat + d_lat, "lng": lng + d_lng}) for candidate in candidates: key = (round(candidate["lat"], 6), round(candidate["lng"], 6)) if key in checked: continue checked.add(key) if _has_street_view(candidate["lat"], candidate["lng"]): return candidate return None def haversine_km(lat1: float, lon1: float, lat2: float, lon2: float) -> float: from math import radians, cos, sin, asin, sqrt lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2]) dlon = lon2 - lon1 dlat = lat2 - lat1 a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2 c = 2 * asin(sqrt(a)) r = 6371 return c * r def score_from_distance_km(distance_km: float) -> float: max_score = 5000.0 return max(0.0, max_score - distance_km) def build_street_html(image_url: str) -> str: base = """
""" return base.replace('__IMG_URL__', image_url) def gr_start_game(difficulty: str, username: str, request: gr.Request): rounds: List[Dict[str, Any]] = [] date_str = datetime.now(timezone.utc).date().isoformat() game_id = str(uuid.uuid4()) # Generate unique game ID for _ in range(3): loc = pick_random_location(difficulty) round_id = generate_id() rounds.append({ 'id': round_id, 'lat': loc['lat'], 'lng': loc['lng'], 'image_url': street_view_image_url(loc['lat'], loc['lng']), 'human_guess': None, 'ai_guess': None, 'human_score': 0.0, 'ai_score': 0.0, }) user_sessions[username] = { 'game_id': game_id, 'difficulty': difficulty, 'rounds': rounds, 'total_score': 0.0, 'completed': False, 'date': date_str, } r0 = rounds[0] street_html = build_street_html(r0['image_url']) return rounds, 0, r0['id'], street_html, "", "" def get_round(username: str, round_id: str) -> Optional[Dict[str, Any]]: session_data = user_sessions.get(username) if not session_data: return None for r in session_data['rounds']: if r['id'] == round_id: return r return None def gr_submit_guess(round_id: str, lat: float, lng: float, username: str, request: gr.Request): rnd = get_round(username, round_id) if not rnd: return "", "Round not found", gr.update(), gr.update(), gr.update() distance_km = haversine_km(rnd['lat'], rnd['lng'], float(lat), float(lng)) score = score_from_distance_km(distance_km) rnd['human_guess'] = {'lat': float(lat), 'lng': float(lng)} rnd['human_score'] = score rnd['human_distance_km'] = float(distance_km) result_text = f"Your guess was {distance_km:.2f} km away. You scored {score:.0f} points." scoreboard_html = ( f"
" f"
Human Guess Recorded
" f"
{result_text}
" f"
" f" Distance: {distance_km:.2f} km" f" Score: {score:.0f} pts" f"
" f"
AI analysis will be added once the model finishes.
" f"
" ) popup_html = """

Round Results

__SCOREBOARD__
G = Ground Truth H = Human A = AI
""".replace('__SCOREBOARD__', scoreboard_html)\ .replace('__RND_LAT__', str(rnd['lat']))\ .replace('__RND_LNG__', str(rnd['lng']))\ .replace('__H_LAT__', str(float(lat)))\ .replace('__H_LNG__', str(float(lng)))\ .replace('__AI_LAT__', str(float(lat)))\ .replace('__AI_LNG__', str(float(lng)))\ .replace('__GMAPS_KEY__', GOOGLE_MAPS_API_KEY or '') return popup_html, result_text, rnd['lat'], rnd['lng'], score def extract_coords_from_text(text: str): import re m = re.search(r"\[ANSWER\]\s*([+-]?\d+(?:\.\d+)?)\s*,\s*([+-]?\d+(?:\.\d+)?)\s*\[/ANSWER\]", text, re.IGNORECASE) if not m: return None try: lat = float(m.group(1)) lng = float(m.group(2)) return {'lat': lat, 'lng': lng} except Exception: return {'lat': 0, 'lng': 0} # instead of defaulting to geocode it will return 0 as default when failing def geocode_text_to_coords(query: str) -> Optional[Dict[str, float]]: if not GOOGLE_MAPS_API_KEY: return None resp = requests.get('https://maps.googleapis.com/maps/api/geocode/json', params={'address': query, 'key': GOOGLE_MAPS_API_KEY}) try: j = resp.json() if j.get('results'): loc = j['results'][0]['geometry']['location'] return {'lat': loc['lat'], 'lng': loc['lng']} except Exception: return None return None def format_coords(coords: Optional[Dict[str, float]]) -> str: if not coords or 'lat' not in coords or 'lng' not in coords: return "N/A" return f"lat: {coords['lat']:.2f}, lng: {coords['lng']:.2f}" load_zones_from_file() def _read_text(path: str) -> str: try: with open(path, 'r') as f: return f.read() except Exception: return "" APP_CSS = _read_text('static/style.css') + "\n#lat_box, #lng_box { display:none; }\n" + """ #next_btn { position: absolute; left: -9999px; } #lobby_group, #game_group{max-width:1024px;margin:24px auto;padding:16px;} #start_btn{height:48px;font-weight:700} .gradio-container{background:#FFFBEB} body, .gradio-container, .gradio-container *{color:#111 !important} /* force markdown text to be dark */ .gradio-container .prose, .gradio-container .prose *{color:#111 !important} /* override: LLM output textbox uses white font on dark background */ #ai_chat, #ai_chat *{color:#fff !important} #ai_chat textarea{background:#111 !important;color:#fff !important} #ai_chat label{color:#fff !important} /* difficulty dropdown white text */ #difficulty_select, #difficulty_select *{color:#fff !important} /* keep dropdown menu items readable */ .svelte-3lgy39 .wrap-inner, .wrap-inner{ color: inherit; } #popup-overlay, #popup-overlay * {color:#f9fafb !important} #popup-overlay #ai-analysis-box, #popup-overlay #ai-analysis-box * { color: #1e293b !important; } /* Game Over summary text in orange */ .gradio-container div[style*='text-align:center'] h2, .gradio-container div[style*='text-align:center'] h3, .gradio-container div[style*='text-align:center'] p { color: #FA500F !important; } /* Remove all grey borders */ .gradio-container button, .gradio-container .form, .gradio-container .block, .gradio-container input, .gradio-container textarea, .gradio-container select, .gradio-container .gr-box, .gradio-container .gr-input, .gradio-container .gr-form { border: none !important; } /* Style login button to be compact */ #login_button {max-width: 180px !important;} #login_button button {padding: 6px 14px !important; font-size: 0.9rem !important;} /* Style Hugging Face GPU notifications */ .toast-wrap, .toast-body {background: #f8fafc !important; color: #111 !important; border: none !important;} .toast-title {color: #111 !important;} .toast-text {color: #334155 !important;} .toast-close {color: #111 !important;} """ # Client boot JS to initialize the mini-map reliably in Gradio (scripts in HTML are sanitized) APP_BOOT_JS = """ () => { const GMAPS_KEY = "__GMAPS_KEY__"; const log = (...a) => { try { console.log('[boot]', ...a); } catch(_) {} }; function ensureMapsLoaded(cb) { if (window.google && google.maps) return cb(); if (!GMAPS_KEY) { log('No GOOGLE_MAPS_API_KEY; mini-map disabled'); return; } window.__gmapsQueue = window.__gmapsQueue || []; window.__gmapsQueue.push(cb); if (window.__gmapsLoading) return; window.__gmapsLoading = true; window.__mini_cb__ = () => { log('Google Maps ready'); const q = window.__gmapsQueue || []; q.forEach(fn => { try { fn(); } catch(_) {} }); window.__gmapsQueue = []; }; const s = document.createElement('script'); s.async = true; s.defer = true; s.dataset.gmapsLoader = '1'; s.src = 'https://maps.googleapis.com/maps/api/js?key=' + GMAPS_KEY + '&callback=__mini_cb__'; s.onerror = () => log('Failed to load Google Maps script'); document.head.appendChild(s); } function initMiniMapIfPresent() { const el = document.getElementById('mini-map'); if (!el || el.dataset.initialized === '1') return; ensureMapsLoaded(() => { try { const map = new google.maps.Map(el, { center: { lat: 0, lng: 0 }, zoom: 1, streetViewControl: false, mapTypeControl: false, fullscreenControl: false }); window._miniMapInstance = map; el.dataset.initialized = '1'; let marker=null; map.addListener('click',(e)=>{ if(marker) marker.setMap(null); marker=new google.maps.Marker({position:e.latLng, map}); const latBox=document.querySelector('#lat_box input, #lat_box textarea, #lat_box input[type=number]'); const lngBox=document.querySelector('#lng_box input, #lng_box textarea, #lng_box input[type=number]'); if(latBox){ latBox.value=e.latLng.lat(); latBox.dispatchEvent(new Event('input',{bubbles:true})); } if(lngBox){ lngBox.value=e.latLng.lng(); lngBox.dispatchEvent(new Event('input',{bubbles:true})); } }); setTimeout(() => { try { google.maps.event.trigger(map, 'resize'); map.setCenter({ lat: 0, lng: 0 }); } catch(_) {} }, 150); log('Mini-map initialized'); } catch (e) { log('Mini-map init error', e); } }); } function initPopupIfPresent() { log('initPopupIfPresent called'); const el = document.getElementById('popup-map'); if (!el || el.dataset.initialized === '1') return; log("Raw AI dataset values:", { lat: el.dataset.aiLat, lng: el.dataset.aiLng }); const rnd = { lat: parseFloat(el.dataset.rndLat), lng: parseFloat(el.dataset.rndLng) }; const human = { lat: parseFloat(el.dataset.hLat), lng: parseFloat(el.dataset.hLng) }; const ai = { lat: parseFloat(el.dataset.aiLat), lng: parseFloat(el.dataset.aiLng) }; log("Parsed AI coords:", ai); ensureMapsLoaded(() => { try { log('Popup map element found, ensuring maps loaded...'); const mapOpts={zoom:6,center:rnd,mapTypeControl:false,streetViewControl:false,fullscreenControl:false}; const m = new google.maps.Map(el, mapOpts); el.dataset.initialized = '1'; const bounds = new google.maps.LatLngBounds(); const markerIcon = (fill, stroke) => ({ path: google.maps.SymbolPath.CIRCLE, scale: 9.5, fillColor: fill, fillOpacity: 1, strokeColor: stroke, strokeWeight: 2 }); const markerLabel = (text) => ({ text, color: '#ffffff', fontWeight: '700', fontSize: '12px' }); log("Marker label style:", markerLabel("A")); const gMk = new google.maps.Marker({ position: rnd, map: m, label: markerLabel('G'), icon: markerIcon('#22c55e', '#166534') }); bounds.extend(gMk.getPosition()); if (Number.isFinite(ai.lat) && Number.isFinite(ai.lng)) { log("AI coordinates are valid, creating marker."); const aMk = new google.maps.Marker({ position: ai, map: m, label: markerLabel('M'), icon: markerIcon('#FA500F', '#c2410c') }); bounds.extend(aMk.getPosition()); new google.maps.Polyline({ path: [rnd, ai], geodesic: true, strokeColor: '#FA500F', strokeOpacity: 1.0, strokeWeight: 2, map: m }); } else { log("AI coordinates are NOT valid, skipping marker creation."); } if (Number.isFinite(human.lat) && Number.isFinite(human.lng)) { const hMk = new google.maps.Marker({ position: human, map: m, label: markerLabel('H'), icon: markerIcon('#2563EB', '#1e3a8a') }); bounds.extend(hMk.getPosition()); new google.maps.Polyline({ path: [rnd, human], geodesic: true, strokeColor: '#2563EB', strokeOpacity: 1.0, strokeWeight: 2, map: m }); } const ne = bounds.getNorthEast(); const sw = bounds.getSouthWest(); if (ne && sw && ne.equals(sw)) { m.setCenter(ne); log("Setting zoom to 18"); m.setZoom(18); } else { log("Fitting map to bounds"); m.fitBounds(bounds); } setTimeout(() => { try { google.maps.event.trigger(m, 'resize'); const ne2 = bounds.getNorthEast(); const sw2 = bounds.getSouthWest(); if (ne2 && sw2 && ne2.equals(sw2)) { m.setCenter(ne2); log("Setting zoom to 18 after resize"); m.setZoom(18); } else { log("Fitting map to bounds after resize"); m.fitBounds(bounds); } } catch (e) { log('Resize error', e); } }, 120); const closeButtons = [document.getElementById('popup-close-next'), document.getElementById('popup-close-next-footer')]; closeButtons.forEach((btn) => { if (!btn) return; if (!btn.dataset.bound) { btn.addEventListener('click', () => { const nxt = document.getElementById('next_btn'); if (nxt) nxt.click(); }); btn.dataset.bound = '1'; } }); window.addEventListener('keydown', (ev) => { if (ev.key === 'Escape') { const nxt = document.getElementById('next_btn'); if (nxt) nxt.click(); } }, { once: true }); log('Popup map initialized'); } catch (e) { log('Popup map init error', e); } }); } function initFullscreenButtonIfPresent() { log('initFullscreenButtonIfPresent called'); const btn = document.getElementById('fullscreen-btn'); const wrapper = document.getElementById('fullscreen-wrapper'); const img = document.getElementById('street-image'); if (!btn || !wrapper || !img || btn.dataset.initialized === '1') return; const enterIcon = ''; const exitIcon = ''; btn.innerHTML = enterIcon; const originalImgParentStyle = img.parentElement.style; const originalImgStyle = { width: img.style.width, height: img.style.height, objectFit: img.style.objectFit, borderRadius: img.style.borderRadius, }; btn.addEventListener('click', () => { if (!document.fullscreenElement) { wrapper.requestFullscreen().catch(err => { log('Fullscreen error:', err); }); } else { document.exitFullscreen(); } }); document.addEventListener('fullscreenchange', () => { if (document.fullscreenElement === wrapper) { btn.innerHTML = exitIcon; img.style.width = '100%'; img.style.height = '100%'; img.style.objectFit = 'contain'; img.style.borderRadius = '0'; } else { btn.innerHTML = enterIcon; img.style.width = originalImgStyle.width; img.style.height = originalImgStyle.height; img.style.objectFit = originalImgStyle.objectFit; img.style.borderRadius = originalImgStyle.borderRadius; } }); btn.dataset.initialized = '1'; log('Fullscreen button initialized'); } function initMapControlsIfPresent() { const plusBtn = document.getElementById('map-size-plus'); const minusBtn = document.getElementById('map-size-minus'); const mapWrap = document.getElementById('mini-map-wrap'); if (!plusBtn || !minusBtn || !mapWrap || mapWrap.dataset.controlsInitialized) return; const sizes = [{w: 200, h: 130}, {w: 280, h: 180}, {w: 400, h: 260}, {w: 550, h: 360}]; let currentSizeIndex = 1; const updateSize = () => { const newSize = sizes[currentSizeIndex]; mapWrap.style.width = newSize.w + 'px'; mapWrap.style.height = newSize.h + 'px'; if (window._miniMapInstance) { setTimeout(() => { google.maps.event.trigger(window._miniMapInstance, 'resize'); }, 300); } log('Map size changed to', newSize); }; plusBtn.addEventListener('click', (e) => { e.stopPropagation(); if (currentSizeIndex < sizes.length - 1) { currentSizeIndex++; updateSize(); } }); minusBtn.addEventListener('click', (e) => { e.stopPropagation(); if (currentSizeIndex > 0) { currentSizeIndex--; updateSize(); } }); mapWrap.dataset.controlsInitialized = '1'; updateSize(); log('Map controls initialized'); } const obs = new MutationObserver(() => { initMiniMapIfPresent(); initPopupIfPresent(); initMapControlsIfPresent(); initFullscreenButtonIfPresent(); }); obs.observe(document.documentElement, { childList: true, subtree: true }); initMiniMapIfPresent(); initPopupIfPresent(); initMapControlsIfPresent(); initFullscreenButtonIfPresent(); } """.replace("__GMAPS_KEY__", GOOGLE_MAPS_API_KEY or '') with gr.Blocks(css=APP_CSS, title="LLM GeoGuessr") as demo: user_profile = gr.State() with gr.Row(): gr.Markdown("## LLM GeoGuessr", elem_id="title_md") login_button = LoginButton(visible=True, elem_id="login_button", scale=0, min_width=300) gr.Markdown(""" ### Magistral Magistral is a multimodal model trained by Mistral AI to do visual reasoning. The model has not been trained on geogusser data, nor does it have access to any tools (maps, etc.) which makes it a fair experience as it purely relies on the visual clues in the image. ### How to Play 1. **Login** using your Hugging Face account and click "Start Game". 2. You'll be shown a random google maps image. 3. Place a marker on the mini-map to guess the location. 4. Submit your guess and see how your score compares to the AI's! You score is 5000 - (distance to the ground truth in km). The game consists of 3 rounds ! """) login_prompt_md = gr.Markdown("### Please log in with your Hugging Face account to play.", visible=True) logged_in_md = gr.Markdown(visible=False) with gr.Group(visible=False, elem_id="lobby_group") as lobby_group: start_btn = gr.Button("Start Game", variant="primary", elem_id="start_btn") limit_msg = gr.Markdown(visible=False) with gr.Group(visible=False, elem_id="game_group") as game_group: rounds_state = gr.State([]) idx_state = gr.State(0) round_id_box = gr.Textbox(visible=False) lat_box = gr.Number(visible=True, elem_id="lat_box", label="lat") lng_box = gr.Number(visible=True, elem_id="lng_box", label="lng") street_html = gr.HTML(visible=True) map_html = gr.HTML(visible=False) validate_btn = gr.Button("Validate Guess", visible=True) result_md = gr.Markdown() popup_html = gr.HTML() ai_chat = gr.Textbox(label="AI Analysis", interactive=False, elem_id="ai_chat", lines=15) next_btn = gr.Button("Next", visible=True, elem_id="next_btn") final_md = gr.Markdown(visible=True) def on_login(token: gr.OAuthToken | None): if not token: return ( None, gr.update(visible=True), gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), gr.update(), gr.update(visible=False), ) try: profile = whoami(token=token.token) username = profile["name"] # Use server token to read the dataset, not user's token todays_games = data_manager.get_todays_games(token=SERVER_HF_TOKEN) # Only check if user played today if BLOCK_MULTIPLE_GAMES is enabled has_played = data_manager.BLOCK_MULTIPLE_GAMES and data_manager.has_user_played_today(username, todays_games) except Exception as e: gr.Warning(f"Could not check your game status. Please try again. Error: {e}") return ( None, gr.update(visible=True), gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), gr.update(), gr.update(visible=False), ) welcome_message = f"Welcome, **{profile.get('fullname', username)}**! You are logged in as **{username}**." updates = [ profile, gr.update(visible=False), gr.update(visible=True), gr.update(visible=True, value=welcome_message), gr.update(visible=True), ] if has_played: limit_message = "You have already played today. Please come back tomorrow for a new challenge!" updates.extend([ gr.update(interactive=False), gr.update(visible=True, value=limit_message), ]) else: updates.extend([ gr.update(interactive=True), gr.update(visible=False), ]) return tuple(updates) # Use demo.load to set the initial UI state when the app loads with an existing token. # This is the key fix for the UI flickering issue. demo.load( on_login, outputs=[ user_profile, login_prompt_md, login_button, logged_in_md, lobby_group, start_btn, limit_msg, ], ) # The click handler is still needed to initiate the login flow if the user is not logged in. login_button.click( on_login, outputs=[ user_profile, login_prompt_md, login_button, logged_in_md, lobby_group, start_btn, limit_msg, ], ) def start_click(profile: dict, request: gr.Request): if not profile: gr.Warning("Please log in before starting the game.") # Return no-ops for all outputs to prevent errors return None, 0, "", "", "", "", gr.update(), gr.update(), gr.update() r, idx, rid, s_html, m_html, err = gr_start_game("easy", profile["name"], request) return ( r, idx, rid, s_html, m_html, gr.update(value=""), gr.update(visible=True), gr.update(visible=False), gr.update(value="") ) start_btn.click( start_click, inputs=[user_profile], outputs=[rounds_state, idx_state, round_id_box, street_html, map_html, result_md, game_group, lobby_group, limit_msg] ) def on_validate(rid, lat, lng, profile: dict, request: gr.Request): if not profile: return username = profile["name"] _popup, txt, a_lat, a_lng, _score = gr_submit_guess(rid, lat, lng, username, request) yield "", txt + "\n\n[AI] Analyzing image..." rnd = get_round(username, rid) if not rnd: yield "", txt + "\n\n[Error] Round not found" return try: img_resp = requests.get(rnd['image_url']) img_resp.raise_for_status() last_text = "" for partial_text in llm_stream_image_text(img_resp.content): last_text = partial_text yield "", txt + "\n\n" + partial_text # After stream completes, compute AI guess and score ai_coords = extract_coords_from_text(last_text) or geocode_text_to_coords(last_text[-256:]) if ai_coords: rnd['ai_guess'] = ai_coords ai_dist_km = haversine_km(rnd['lat'], rnd['lng'], ai_coords['lat'], ai_coords['lng']) rnd['ai_distance_km'] = float(ai_dist_km) rnd['ai_score'] = score_from_distance_km(ai_dist_km) # Store the AI analysis text for later recording rnd['ai_analysis'] = last_text # Record this round's data immediately to prevent abuse sess = user_sessions.get(username, {}) game_id = sess.get('game_id', '') round_idx = next((i for i, rr in enumerate(sess.get('rounds', [])) if rr['id'] == rid), 0) + 1 round_record = { "round_number": round_idx, "actual_location": {"lat": rnd.get('lat'), "lng": rnd.get('lng')}, "human_guess": rnd.get('human_guess'), "human_distance_km": round(rnd.get('human_distance_km', 0), 2), "human_score": float(round(rnd.get('human_score', 0))), "ai_guess": rnd.get('ai_guess'), "ai_distance_km": round(rnd.get('ai_distance_km', 0), 2) if rnd.get('ai_distance_km') else None, "ai_score": float(round(rnd.get('ai_score', 0))) if rnd.get('ai_score') else 0.0, "ai_analysis": rnd.get('ai_analysis', ''), } data_manager.update_game_record(username, game_id, round_data=round_record) except Exception as e: yield "", txt + f"\n\n[Error] {e}" return # 3) Show popup with both guesses and update scoreboard sess = user_sessions.get(username, {}) total_human = sum(float(r.get('human_score', 0.0)) for r in sess.get('rounds', [])) total_ai = sum(float(r.get('ai_score', 0.0)) for r in sess.get('rounds', [])) round_idx = next((i for i, rr in enumerate(sess.get('rounds', [])) if rr['id'] == rid), 0) + 1 # Escape AI analysis to display safely inside HTML import html as _html ai_text_safe = _html.escape(last_text or "") summary_safe = _html.escape(txt) human_guess_str = format_coords(rnd.get('human_guess')) ai_guess_str = format_coords(rnd.get('ai_guess')) scoreboard_html = ( f"
" f"
Round {round_idx}
" f"
{summary_safe}
" f"
" f" Human: {rnd.get('human_score',0):.0f} pts ( {human_guess_str} ) ({rnd.get('human_distance_km',0.0):.1f} km)" f" AI: {rnd.get('ai_score',0):.0f} pts ( {ai_guess_str} ) ({rnd.get('ai_distance_km',0.0):.1f} km)" f"
" f"
Totals — Human {total_human:.0f} / AI {total_ai:.0f}
" f"
" f"
AI Analysis
" f"
" + ai_text_safe + "
" f"
" f"
" ) ai = rnd.get('ai_guess') ai_lat_str = "" ai_lng_str = "" if ai: ai_lat = float(ai.get('lat', 0.0)) ai_lng = float(ai.get('lng', 0.0)) ai_lat_str = str(ai_lat) ai_lng_str = str(ai_lng) popup_html = """ """ popup_html = popup_html.replace('__SCOREBOARD__', scoreboard_html) .replace('__RND_LAT__', str(rnd['lat'])) .replace('__RND_LNG__', str(rnd['lng'])) .replace('__H_LAT__', str(float(lat))) .replace('__H_LNG__', str(float(lng))) .replace('__AI_LAT__', ai_lat_str) .replace('__AI_LNG__', ai_lng_str) .replace('__GMAPS_KEY__', GOOGLE_MAPS_API_KEY or '') yield popup_html, (txt + ("\n\n" + (last_text or ""))) validate_btn.click(on_validate, inputs=[round_id_box, lat_box, lng_box, user_profile], outputs=[popup_html, ai_chat]) def on_next(r_state: list, idx: int, profile: dict, request: gr.Request): if not profile: return idx, gr.update(), gr.update(), gr.update(), gr.update() username = profile["name"] idx += 1 sess = user_sessions.get(username) if not sess or idx >= len(sess['rounds']): total_human = sum(float(r.get('human_score', 0.0)) for r in sess.get('rounds', [])) total_ai = sum(float(r.get('ai_score', 0.0)) for r in sess.get('rounds', [])) game_id = sess.get('game_id', '') # Update the final scores (rounds already recorded incrementally) data_manager.update_game_record(username, game_id, final_score=total_human, final_ai_score=total_ai) winner_message = "It's a tie!" if total_human > total_ai: winner_message = "Congratulations, you won!" elif total_ai > total_human: winner_message = "The AI won this time." summary_html = f"""

Game Over!

Here are the final scores:

Your Score: {total_human:.0f}

AI's Score: {total_ai:.0f}

{winner_message}

""" return idx, gr.update(value=summary_html), gr.update(value=""), gr.update(value=""), gr.update(value="") r = sess['rounds'][idx] s_html = build_street_html(r['image_url']) return idx, gr.update(value=s_html), gr.update(value=r['id']), gr.update(value=""), gr.update(value="") next_btn.click(on_next, inputs=[rounds_state, idx_state, user_profile], outputs=[idx_state, street_html, round_id_box, popup_html, ai_chat]) # Inject boot JS using load(js=callable) compatible format demo.load(fn=lambda: None, inputs=None, outputs=None, js=APP_BOOT_JS) if __name__ == "__main__": demo.queue().launch(server_name="0.0.0.0", server_port=7860)