Spaces:
Running
Running
| """ | |
| 统一的Cookie管理器 | |
| 整合JSON文件和环境变量cookie的检测、加载和管理功能 | |
| """ | |
| import os | |
| import json | |
| from dataclasses import dataclass | |
| from typing import List, Dict, Optional | |
| from utils.paths import cookies_dir | |
| from utils.cookie_handler import convert_cookie_editor_to_playwright, convert_kv_to_playwright | |
| from utils.common import clean_env_value | |
| class CookieSource: | |
| """Cookie来源的统一表示""" | |
| type: str # "file" | "env_var" | |
| identifier: str # filename or "USER_COOKIE_1" | |
| display_name: str # 显示名称 | |
| exists: bool = True | |
| def __str__(self): | |
| return f"{self.type}:{self.identifier}" | |
| class CookieManager: | |
| """ | |
| 统一的Cookie管理器 | |
| 负责检测、加载和缓存所有来源的cookie数据 | |
| """ | |
| def __init__(self, logger=None): | |
| self.logger = logger | |
| self._detected_sources: Optional[List[CookieSource]] = None | |
| self._cookie_cache: Dict[str, List[Dict]] = {} | |
| def detect_all_sources(self) -> List[CookieSource]: | |
| """ | |
| 检测所有可用的cookie来源(JSON文件 + 环境变量) | |
| 结果会被缓存,避免重复扫描 | |
| """ | |
| if self._detected_sources is not None: | |
| return self._detected_sources | |
| sources = [] | |
| # 1. 扫描cookies目录中的JSON文件 | |
| try: | |
| cookie_path = cookies_dir() | |
| if os.path.isdir(cookie_path): | |
| cookie_files = [f for f in os.listdir(cookie_path) if f.lower().endswith('.json')] | |
| for cookie_file in cookie_files: | |
| source = CookieSource( | |
| type="file", | |
| identifier=cookie_file, | |
| display_name=cookie_file | |
| ) | |
| sources.append(source) | |
| if cookie_files and self.logger: | |
| self.logger.info(f"发现 {len(cookie_files)} 个 Cookie 文件") | |
| elif self.logger: | |
| self.logger.info(f"在 {cookie_path} 目录下未找到任何 .json 格式的 Cookie 文件") | |
| else: | |
| if self.logger: | |
| self.logger.error(f"Cookie 目录不存在: {cookie_path}") | |
| except Exception as e: | |
| if self.logger: | |
| self.logger.error(f"扫描 Cookie 目录时出错: {e}") | |
| # 2. 扫描USER_COOKIE环境变量 | |
| cookie_index = 1 | |
| env_cookie_count = 0 | |
| while True: | |
| env_var_name = f"USER_COOKIE_{cookie_index}" | |
| env_value = clean_env_value(os.getenv(env_var_name)) | |
| if not env_value: | |
| if cookie_index == 1 and self.logger: | |
| self.logger.info(f"未检测到任何 USER_COOKIE 环境变量") | |
| break | |
| source = CookieSource( | |
| type="env_var", | |
| identifier=env_var_name, | |
| display_name=env_var_name | |
| ) | |
| sources.append(source) | |
| env_cookie_count += 1 | |
| cookie_index += 1 | |
| if env_cookie_count > 0 and self.logger: | |
| self.logger.info(f"发现 {env_cookie_count} 个 Cookie 环境变量") | |
| # 缓存结果 | |
| self._detected_sources = sources | |
| return sources | |
| def load_cookies(self, source: CookieSource) -> List[Dict]: | |
| """ | |
| 从指定来源加载cookie数据 | |
| Args: | |
| source: Cookie来源对象 | |
| Returns: | |
| Playwright兼容的cookie列表 | |
| """ | |
| cache_key = str(source) | |
| # 检查缓存 | |
| if cache_key in self._cookie_cache: | |
| if self.logger: | |
| self.logger.debug(f"从缓存加载 Cookie: {source.display_name}") | |
| return self._cookie_cache[cache_key] | |
| cookies = [] | |
| try: | |
| if source.type == "file": | |
| cookies = self._load_from_file(source.identifier) | |
| elif source.type == "env_var": | |
| cookies = self._load_from_env(source.identifier) | |
| else: | |
| if self.logger: | |
| self.logger.error(f"未知的 Cookie 来源类型: {source.type}") | |
| return [] | |
| # 缓存结果 | |
| self._cookie_cache[cache_key] = cookies | |
| if self.logger: | |
| self.logger.info(f"从 {source.display_name} 加载了 {len(cookies)} 个 Cookie 数据") | |
| except Exception as e: | |
| if self.logger: | |
| self.logger.error(f"从 {source.display_name} 加载 Cookie 时出错: {e}") | |
| return [] | |
| return cookies | |
| def _load_from_file(self, filename: str) -> List[Dict]: | |
| """从JSON文件加载 Cookie""" | |
| cookie_path = cookies_dir() / filename | |
| if not os.path.exists(cookie_path): | |
| raise FileNotFoundError(f"Cookie 文件不存在: {cookie_path}") | |
| with open(cookie_path, 'r', encoding='utf-8') as f: | |
| cookies_from_file = json.load(f) | |
| return convert_cookie_editor_to_playwright(cookies_from_file, logger=self.logger) | |
| def _load_from_env(self, env_var_name: str) -> List[Dict]: | |
| """从环境变量加载 Cookie""" | |
| env_value = clean_env_value(os.getenv(env_var_name)) | |
| if not env_value: | |
| raise ValueError(f"环境变量 {env_var_name} 不存在或为空") | |
| return convert_kv_to_playwright( | |
| env_value, | |
| default_domain=".google.com", | |
| logger=self.logger | |
| ) | |
| def get_all_sources(self) -> List[CookieSource]: | |
| """获取所有检测到的 Cookie 来源""" | |
| return self.detect_all_sources() | |
| def clear_cache(self): | |
| """清空 Cookie 缓存""" | |
| self._cookie_cache.clear() | |
| if self.logger: | |
| self.logger.debug("Cookie 缓存已清空") | |
| def get_source_summary(self) -> Dict[str, int]: | |
| """ | |
| 获取 Cookie 来源统计信息 | |
| Returns: | |
| 包含各类型来源数量的字典 | |
| """ | |
| sources = self.detect_all_sources() | |
| summary = { | |
| "total": len(sources), | |
| "files": 0, | |
| "env_vars": 0 | |
| } | |
| for source in sources: | |
| if source.type == "file": | |
| summary["files"] += 1 | |
| elif source.type == "env_var": | |
| summary["env_vars"] += 1 | |
| return summary |