Spaces:
Sleeping
Sleeping
| # mapcrunch_controller.py (Fixed) | |
| from selenium import webdriver | |
| from selenium.webdriver.common.by import By | |
| from selenium.webdriver.support.ui import WebDriverWait | |
| from selenium.webdriver.support import expected_conditions as EC | |
| from selenium.common.exceptions import TimeoutException | |
| from selenium.webdriver.chrome.options import Options | |
| from typing import Dict, Optional | |
| import time | |
| # 修正: 从 config.py 导入所有需要的变量 | |
| from config import ( | |
| MAPCRUNCH_URL, | |
| SELECTORS, | |
| DATA_COLLECTION_CONFIG, | |
| MAPCRUNCH_OPTIONS, | |
| SELENIUM_CONFIG, | |
| ) | |
| class MapCrunchController: | |
| """Selenium controller for MapCrunch website automation""" | |
| def __init__(self, headless: bool = False): | |
| self.driver = None | |
| self.wait = None | |
| self.headless = headless | |
| self.setup_driver() | |
| def setup_driver(self): | |
| """Initialize Chrome driver with appropriate settings""" | |
| chrome_options = Options() | |
| if self.headless: | |
| chrome_options.add_argument("--headless") | |
| chrome_options.add_argument( | |
| f"--window-size={SELENIUM_CONFIG['window_size'][0]},{SELENIUM_CONFIG['window_size'][1]}" | |
| ) | |
| chrome_options.add_argument("--disable-gpu") | |
| chrome_options.add_argument("--no-sandbox") | |
| chrome_options.add_argument("--disable-dev-shm-usage") | |
| self.driver = webdriver.Chrome(options=chrome_options) | |
| self.driver.set_window_size(*SELENIUM_CONFIG["window_size"]) | |
| self.wait = WebDriverWait(self.driver, SELENIUM_CONFIG["implicit_wait"]) | |
| self.driver.get(MAPCRUNCH_URL) | |
| time.sleep(3) | |
| def setup_clean_environment(self): | |
| """ | |
| Forcefully enables stealth mode and hides UI elements for a clean benchmark environment. | |
| """ | |
| try: | |
| # 1. 强制开启 Stealth 模式 | |
| # 这一步确保地址信息被网站自身的逻辑隐藏 | |
| stealth_checkbox = self.wait.until( | |
| EC.presence_of_element_located( | |
| (By.CSS_SELECTOR, SELECTORS["stealth_checkbox"]) | |
| ) | |
| ) | |
| if not stealth_checkbox.is_selected(): | |
| # 使用JS点击更可靠,可以避免元素被遮挡的问题 | |
| self.driver.execute_script("arguments[0].click();", stealth_checkbox) | |
| print("✅ Stealth mode programmatically enabled for benchmark.") | |
| # 2. 用 JS 隐藏其他视觉干扰元素 | |
| # 这一步确保截图区域干净 | |
| self.driver.execute_script(""" | |
| const elementsToHide = ['#menu', '#info-box', '#social', '#bottom-box', '#topbar']; | |
| elementsToHide.forEach(sel => { | |
| const el = document.querySelector(sel); | |
| if (el) el.style.display = 'none'; | |
| }); | |
| const panoBox = document.querySelector('#pano-box'); | |
| if (panoBox) panoBox.style.height = '100vh'; | |
| """) | |
| print("✅ Clean UI configured for benchmark.") | |
| except Exception as e: | |
| print(f"⚠️ Warning: Could not fully configure clean environment: {e}") | |
| # setup_collection_options 函数保持不变... | |
| def setup_collection_options(self, options: Dict = None): | |
| if options is None: | |
| options = MAPCRUNCH_OPTIONS | |
| try: | |
| options_button = self.wait.until( | |
| EC.element_to_be_clickable( | |
| (By.CSS_SELECTOR, SELECTORS["options_button"]) | |
| ) | |
| ) | |
| options_button.click() | |
| time.sleep(1) | |
| # ... (内部逻辑和之前一样) | |
| urban_checkbox = self.driver.find_element( | |
| By.CSS_SELECTOR, SELECTORS["urban_checkbox"] | |
| ) | |
| if options.get("urban_only", False) != urban_checkbox.is_selected(): | |
| urban_checkbox.click() | |
| indoor_checkbox = self.driver.find_element( | |
| By.CSS_SELECTOR, SELECTORS["indoor_checkbox"] | |
| ) | |
| if options.get("exclude_indoor", True) == indoor_checkbox.is_selected(): | |
| indoor_checkbox.click() | |
| stealth_checkbox = self.driver.find_element( | |
| By.CSS_SELECTOR, SELECTORS["stealth_checkbox"] | |
| ) | |
| if options.get("stealth_mode", True) != stealth_checkbox.is_selected(): | |
| stealth_checkbox.click() | |
| options_button.click() | |
| time.sleep(0.5) | |
| print("✅ Collection options configured") | |
| return True | |
| except Exception as e: | |
| print(f"❌ Error configuring options: {e}") | |
| return False | |
| def _select_countries(self, country_codes: list): | |
| """Select specific countries in the options panel""" | |
| try: | |
| # First, deselect all | |
| assert self.driver is not None | |
| all_countries = self.driver.find_elements(By.CSS_SELECTOR, "#countrylist a") | |
| for country in all_countries: | |
| class_attr = country.get_attribute("class") | |
| if class_attr is not None and "hover" not in class_attr: | |
| country.click() | |
| time.sleep(0.1) | |
| # Then select desired countries | |
| for code in country_codes: | |
| country = self.driver.find_element( | |
| By.CSS_SELECTOR, f'a[data-code="{code}"]' | |
| ) | |
| class_attr = country.get_attribute("class") | |
| if class_attr is not None and "hover" in class_attr: | |
| country.click() | |
| time.sleep(0.1) | |
| print(f"✅ Selected countries: {country_codes}") | |
| except Exception as e: | |
| print(f"⚠️ Warning: Could not select countries: {e}") | |
| def click_go_button(self) -> bool: | |
| """Click the Go button to get new Street View location""" | |
| try: | |
| go_button = self.wait.until( | |
| EC.element_to_be_clickable((By.CSS_SELECTOR, SELECTORS["go_button"])) | |
| ) | |
| go_button.click() | |
| # 修正: DATA_COLLECTION_CONFIG 现在已被导入,可以正常使用 | |
| time.sleep(DATA_COLLECTION_CONFIG.get("wait_after_go", 5)) | |
| return True | |
| except Exception as e: | |
| # 修正: 打印出具体的错误信息 | |
| print(f"❌ Error clicking Go button: {e}") | |
| return False | |
| def get_current_address(self) -> Optional[str]: | |
| # ... (此函数不变) ... | |
| try: | |
| address_element = self.wait.until( | |
| EC.visibility_of_element_located( | |
| (By.CSS_SELECTOR, SELECTORS["address_element"]) | |
| ) | |
| ) | |
| address_text = address_element.text.strip() | |
| address_title = address_element.get_attribute("title") or "" | |
| return ( | |
| address_title | |
| if len(address_title) > len(address_text) | |
| else address_text | |
| ) | |
| except Exception: | |
| # 在stealth模式下,这个元素可能是隐藏的,所以找不到是正常的 | |
| return "Stealth Mode" | |
| # **新增**: 重新加入 get_map_element_info 函数 | |
| def get_map_element_info(self) -> Dict: | |
| """Get map element position and size for coordinate conversion.""" | |
| try: | |
| assert self.wait is not None | |
| map_element = self.wait.until( | |
| EC.presence_of_element_located( | |
| (By.CSS_SELECTOR, SELECTORS["map_container"]) | |
| ) | |
| ) | |
| rect = map_element.rect | |
| location = map_element.location | |
| return { | |
| "x": location["x"], | |
| "y": location["y"], | |
| "width": rect["width"], | |
| "height": rect["height"], | |
| "element": map_element, | |
| } | |
| except Exception as e: | |
| # 这个函数在benchmark中不是必须的,只是GeoBot初始化需要,可以优雅地失败 | |
| # print(f"⚠️ Could not get map element info: {e}") | |
| return {} | |
| def take_street_view_screenshot(self) -> Optional[bytes]: | |
| # ... (此函数不变) ... | |
| try: | |
| pano_element = self.wait.until( | |
| EC.presence_of_element_located( | |
| (By.CSS_SELECTOR, SELECTORS["pano_container"]) | |
| ) | |
| ) | |
| return pano_element.screenshot_as_png | |
| except Exception: | |
| return None | |
| def get_live_location_identifiers(self) -> Dict: | |
| # ... (此函数不变) ... | |
| try: | |
| assert self.driver is not None | |
| # 调用网站自己的JS函数来获取实时链接 | |
| live_identifiers = self.driver.execute_script(""" | |
| try { | |
| return { | |
| permLink: getPermLink(), // 调用网站自己的函数 | |
| panoId: window.panorama.getPano(), | |
| urlString: urlSlug() // 调用网站自己的函数 | |
| }; | |
| } catch (e) { | |
| return { error: e.toString() }; | |
| } | |
| """) | |
| return live_identifiers | |
| except Exception as e: | |
| print(f"❌ Error getting live identifiers: {e}") | |
| return {} | |
| def load_location_from_data(self, location_data: Dict) -> bool: | |
| # ... (此函数不变) ... | |
| try: | |
| url_to_load = location_data.get("perm_link") or location_data.get("url") | |
| if url_to_load and ("/p/" in url_to_load or "/s/" in url_to_load): | |
| print(f"✅ Loading location via perm_link: {url_to_load}") | |
| self.driver.get(url_to_load) | |
| time.sleep(4) | |
| return True | |
| # **备用方案: 根据坐标和视角手动构建链接 (来自您建议的格式)** | |
| lat = location_data.get("lat") | |
| lng = location_data.get("lng") | |
| if lat and lng: | |
| # 尝试从 identifiers 中获取视角信息 | |
| pov = "232.46_-5_0" # 默认视角 | |
| # 注意: 采集时也应该保存 pov 信息,此处为简化 | |
| url_slug = f"{lat}_{lng}_{pov}" | |
| url_to_load = f"{MAPCRUNCH_URL}/p/{url_slug}" | |
| print(f"✅ Loading location by constructing URL: {url_to_load}") | |
| self.driver.get(url_to_load) | |
| time.sleep(3) | |
| return True | |
| print( | |
| "⚠️ No valid location identifier (perm_link, url, or coords) found in data." | |
| ) | |
| return False | |
| except Exception as e: | |
| print(f"❌ Error loading location: {e}") | |
| return False | |
| def close(self): | |
| # ... (此函数不变) ... | |
| if self.driver: | |
| self.driver.quit() | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| self.close() | |