Spaces:
Running
Running
File size: 6,410 Bytes
3085164 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 |
"""
统一的Cookie管理器
整合JSON文件和环境变量cookie的检测、加载和管理功能
"""
import os
import json
from dataclasses import dataclass
from typing import List, Dict, Optional
from utils.paths import cookies_dir
from utils.cookie_handler import convert_cookie_editor_to_playwright, convert_kv_to_playwright
from utils.common import clean_env_value
@dataclass
class CookieSource:
"""Cookie来源的统一表示"""
type: str # "file" | "env_var"
identifier: str # filename or "USER_COOKIE_1"
display_name: str # 显示名称
exists: bool = True
def __str__(self):
return f"{self.type}:{self.identifier}"
class CookieManager:
"""
统一的Cookie管理器
负责检测、加载和缓存所有来源的cookie数据
"""
def __init__(self, logger=None):
self.logger = logger
self._detected_sources: Optional[List[CookieSource]] = None
self._cookie_cache: Dict[str, List[Dict]] = {}
def detect_all_sources(self) -> List[CookieSource]:
"""
检测所有可用的cookie来源(JSON文件 + 环境变量)
结果会被缓存,避免重复扫描
"""
if self._detected_sources is not None:
return self._detected_sources
sources = []
# 1. 扫描cookies目录中的JSON文件
try:
cookie_path = cookies_dir()
if os.path.isdir(cookie_path):
cookie_files = [f for f in os.listdir(cookie_path) if f.lower().endswith('.json')]
for cookie_file in cookie_files:
source = CookieSource(
type="file",
identifier=cookie_file,
display_name=cookie_file
)
sources.append(source)
if cookie_files and self.logger:
self.logger.info(f"发现 {len(cookie_files)} 个 Cookie 文件")
elif self.logger:
self.logger.info(f"在 {cookie_path} 目录下未找到任何 .json 格式的 Cookie 文件")
else:
if self.logger:
self.logger.error(f"Cookie 目录不存在: {cookie_path}")
except Exception as e:
if self.logger:
self.logger.error(f"扫描 Cookie 目录时出错: {e}")
# 2. 扫描USER_COOKIE环境变量
cookie_index = 1
env_cookie_count = 0
while True:
env_var_name = f"USER_COOKIE_{cookie_index}"
env_value = clean_env_value(os.getenv(env_var_name))
if not env_value:
if cookie_index == 1 and self.logger:
self.logger.info(f"未检测到任何 USER_COOKIE 环境变量")
break
source = CookieSource(
type="env_var",
identifier=env_var_name,
display_name=env_var_name
)
sources.append(source)
env_cookie_count += 1
cookie_index += 1
if env_cookie_count > 0 and self.logger:
self.logger.info(f"发现 {env_cookie_count} 个 Cookie 环境变量")
# 缓存结果
self._detected_sources = sources
return sources
def load_cookies(self, source: CookieSource) -> List[Dict]:
"""
从指定来源加载cookie数据
Args:
source: Cookie来源对象
Returns:
Playwright兼容的cookie列表
"""
cache_key = str(source)
# 检查缓存
if cache_key in self._cookie_cache:
if self.logger:
self.logger.debug(f"从缓存加载 Cookie: {source.display_name}")
return self._cookie_cache[cache_key]
cookies = []
try:
if source.type == "file":
cookies = self._load_from_file(source.identifier)
elif source.type == "env_var":
cookies = self._load_from_env(source.identifier)
else:
if self.logger:
self.logger.error(f"未知的 Cookie 来源类型: {source.type}")
return []
# 缓存结果
self._cookie_cache[cache_key] = cookies
if self.logger:
self.logger.info(f"从 {source.display_name} 加载了 {len(cookies)} 个 Cookie 数据")
except Exception as e:
if self.logger:
self.logger.error(f"从 {source.display_name} 加载 Cookie 时出错: {e}")
return []
return cookies
def _load_from_file(self, filename: str) -> List[Dict]:
"""从JSON文件加载 Cookie"""
cookie_path = cookies_dir() / filename
if not os.path.exists(cookie_path):
raise FileNotFoundError(f"Cookie 文件不存在: {cookie_path}")
with open(cookie_path, 'r', encoding='utf-8') as f:
cookies_from_file = json.load(f)
return convert_cookie_editor_to_playwright(cookies_from_file, logger=self.logger)
def _load_from_env(self, env_var_name: str) -> List[Dict]:
"""从环境变量加载 Cookie"""
env_value = clean_env_value(os.getenv(env_var_name))
if not env_value:
raise ValueError(f"环境变量 {env_var_name} 不存在或为空")
return convert_kv_to_playwright(
env_value,
default_domain=".google.com",
logger=self.logger
)
def get_all_sources(self) -> List[CookieSource]:
"""获取所有检测到的 Cookie 来源"""
return self.detect_all_sources()
def clear_cache(self):
"""清空 Cookie 缓存"""
self._cookie_cache.clear()
if self.logger:
self.logger.debug("Cookie 缓存已清空")
def get_source_summary(self) -> Dict[str, int]:
"""
获取 Cookie 来源统计信息
Returns:
包含各类型来源数量的字典
"""
sources = self.detect_all_sources()
summary = {
"total": len(sources),
"files": 0,
"env_vars": 0
}
for source in sources:
if source.type == "file":
summary["files"] += 1
elif source.type == "env_var":
summary["env_vars"] += 1
return summary |