AIStudioBuildWS / utils /cookie_manager.py
hkfires's picture
Upload 10 files
3085164 verified
raw
history blame
6.41 kB
"""
统一的Cookie管理器
整合JSON文件和环境变量cookie的检测、加载和管理功能
"""
import os
import json
from dataclasses import dataclass
from typing import List, Dict, Optional
from utils.paths import cookies_dir
from utils.cookie_handler import convert_cookie_editor_to_playwright, convert_kv_to_playwright
from utils.common import clean_env_value
@dataclass
class CookieSource:
"""Cookie来源的统一表示"""
type: str # "file" | "env_var"
identifier: str # filename or "USER_COOKIE_1"
display_name: str # 显示名称
exists: bool = True
def __str__(self):
return f"{self.type}:{self.identifier}"
class CookieManager:
"""
统一的Cookie管理器
负责检测、加载和缓存所有来源的cookie数据
"""
def __init__(self, logger=None):
self.logger = logger
self._detected_sources: Optional[List[CookieSource]] = None
self._cookie_cache: Dict[str, List[Dict]] = {}
def detect_all_sources(self) -> List[CookieSource]:
"""
检测所有可用的cookie来源(JSON文件 + 环境变量)
结果会被缓存,避免重复扫描
"""
if self._detected_sources is not None:
return self._detected_sources
sources = []
# 1. 扫描cookies目录中的JSON文件
try:
cookie_path = cookies_dir()
if os.path.isdir(cookie_path):
cookie_files = [f for f in os.listdir(cookie_path) if f.lower().endswith('.json')]
for cookie_file in cookie_files:
source = CookieSource(
type="file",
identifier=cookie_file,
display_name=cookie_file
)
sources.append(source)
if cookie_files and self.logger:
self.logger.info(f"发现 {len(cookie_files)} 个 Cookie 文件")
elif self.logger:
self.logger.info(f"在 {cookie_path} 目录下未找到任何 .json 格式的 Cookie 文件")
else:
if self.logger:
self.logger.error(f"Cookie 目录不存在: {cookie_path}")
except Exception as e:
if self.logger:
self.logger.error(f"扫描 Cookie 目录时出错: {e}")
# 2. 扫描USER_COOKIE环境变量
cookie_index = 1
env_cookie_count = 0
while True:
env_var_name = f"USER_COOKIE_{cookie_index}"
env_value = clean_env_value(os.getenv(env_var_name))
if not env_value:
if cookie_index == 1 and self.logger:
self.logger.info(f"未检测到任何 USER_COOKIE 环境变量")
break
source = CookieSource(
type="env_var",
identifier=env_var_name,
display_name=env_var_name
)
sources.append(source)
env_cookie_count += 1
cookie_index += 1
if env_cookie_count > 0 and self.logger:
self.logger.info(f"发现 {env_cookie_count} 个 Cookie 环境变量")
# 缓存结果
self._detected_sources = sources
return sources
def load_cookies(self, source: CookieSource) -> List[Dict]:
"""
从指定来源加载cookie数据
Args:
source: Cookie来源对象
Returns:
Playwright兼容的cookie列表
"""
cache_key = str(source)
# 检查缓存
if cache_key in self._cookie_cache:
if self.logger:
self.logger.debug(f"从缓存加载 Cookie: {source.display_name}")
return self._cookie_cache[cache_key]
cookies = []
try:
if source.type == "file":
cookies = self._load_from_file(source.identifier)
elif source.type == "env_var":
cookies = self._load_from_env(source.identifier)
else:
if self.logger:
self.logger.error(f"未知的 Cookie 来源类型: {source.type}")
return []
# 缓存结果
self._cookie_cache[cache_key] = cookies
if self.logger:
self.logger.info(f"从 {source.display_name} 加载了 {len(cookies)} 个 Cookie 数据")
except Exception as e:
if self.logger:
self.logger.error(f"从 {source.display_name} 加载 Cookie 时出错: {e}")
return []
return cookies
def _load_from_file(self, filename: str) -> List[Dict]:
"""从JSON文件加载 Cookie"""
cookie_path = cookies_dir() / filename
if not os.path.exists(cookie_path):
raise FileNotFoundError(f"Cookie 文件不存在: {cookie_path}")
with open(cookie_path, 'r', encoding='utf-8') as f:
cookies_from_file = json.load(f)
return convert_cookie_editor_to_playwright(cookies_from_file, logger=self.logger)
def _load_from_env(self, env_var_name: str) -> List[Dict]:
"""从环境变量加载 Cookie"""
env_value = clean_env_value(os.getenv(env_var_name))
if not env_value:
raise ValueError(f"环境变量 {env_var_name} 不存在或为空")
return convert_kv_to_playwright(
env_value,
default_domain=".google.com",
logger=self.logger
)
def get_all_sources(self) -> List[CookieSource]:
"""获取所有检测到的 Cookie 来源"""
return self.detect_all_sources()
def clear_cache(self):
"""清空 Cookie 缓存"""
self._cookie_cache.clear()
if self.logger:
self.logger.debug("Cookie 缓存已清空")
def get_source_summary(self) -> Dict[str, int]:
"""
获取 Cookie 来源统计信息
Returns:
包含各类型来源数量的字典
"""
sources = self.detect_all_sources()
summary = {
"total": len(sources),
"files": 0,
"env_vars": 0
}
for source in sources:
if source.type == "file":
summary["files"] += 1
elif source.type == "env_var":
summary["env_vars"] += 1
return summary