File size: 6,410 Bytes
3085164
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
"""
统一的Cookie管理器
整合JSON文件和环境变量cookie的检测、加载和管理功能
"""

import os
import json
from dataclasses import dataclass
from typing import List, Dict, Optional
from utils.paths import cookies_dir
from utils.cookie_handler import convert_cookie_editor_to_playwright, convert_kv_to_playwright
from utils.common import clean_env_value

@dataclass
class CookieSource:
    """Cookie来源的统一表示"""
    type: str  # "file" | "env_var"
    identifier: str  # filename or "USER_COOKIE_1"
    display_name: str  # 显示名称
    exists: bool = True

    def __str__(self):
        return f"{self.type}:{self.identifier}"


class CookieManager:
    """
    统一的Cookie管理器
    负责检测、加载和缓存所有来源的cookie数据
    """

    def __init__(self, logger=None):
        self.logger = logger
        self._detected_sources: Optional[List[CookieSource]] = None
        self._cookie_cache: Dict[str, List[Dict]] = {}

    def detect_all_sources(self) -> List[CookieSource]:
        """
        检测所有可用的cookie来源(JSON文件 + 环境变量)
        结果会被缓存,避免重复扫描
        """
        if self._detected_sources is not None:
            return self._detected_sources

        sources = []

        # 1. 扫描cookies目录中的JSON文件
        try:
            cookie_path = cookies_dir()
            if os.path.isdir(cookie_path):
                cookie_files = [f for f in os.listdir(cookie_path) if f.lower().endswith('.json')]

                for cookie_file in cookie_files:
                    source = CookieSource(
                        type="file",
                        identifier=cookie_file,
                        display_name=cookie_file
                    )
                    sources.append(source)

                if cookie_files and self.logger:
                    self.logger.info(f"发现 {len(cookie_files)} 个 Cookie 文件")
                elif self.logger:
                    self.logger.info(f"在 {cookie_path} 目录下未找到任何 .json 格式的 Cookie 文件")
            else:
                if self.logger:
                    self.logger.error(f"Cookie 目录不存在: {cookie_path}")

        except Exception as e:
            if self.logger:
                self.logger.error(f"扫描 Cookie 目录时出错: {e}")

        # 2. 扫描USER_COOKIE环境变量
        cookie_index = 1
        env_cookie_count = 0

        while True:
            env_var_name = f"USER_COOKIE_{cookie_index}"
            env_value = clean_env_value(os.getenv(env_var_name))

            if not env_value:
                if cookie_index == 1 and self.logger:
                    self.logger.info(f"未检测到任何 USER_COOKIE 环境变量")
                break

            source = CookieSource(
                type="env_var",
                identifier=env_var_name,
                display_name=env_var_name
            )
            sources.append(source)

            env_cookie_count += 1
            cookie_index += 1

        if env_cookie_count > 0 and self.logger:
            self.logger.info(f"发现 {env_cookie_count} 个 Cookie 环境变量")

        # 缓存结果
        self._detected_sources = sources
        return sources

    def load_cookies(self, source: CookieSource) -> List[Dict]:
        """
        从指定来源加载cookie数据

        Args:
            source: Cookie来源对象

        Returns:
            Playwright兼容的cookie列表
        """
        cache_key = str(source)

        # 检查缓存
        if cache_key in self._cookie_cache:
            if self.logger:
                self.logger.debug(f"从缓存加载 Cookie: {source.display_name}")
            return self._cookie_cache[cache_key]

        cookies = []

        try:
            if source.type == "file":
                cookies = self._load_from_file(source.identifier)
            elif source.type == "env_var":
                cookies = self._load_from_env(source.identifier)
            else:
                if self.logger:
                    self.logger.error(f"未知的 Cookie 来源类型: {source.type}")
                return []

            # 缓存结果
            self._cookie_cache[cache_key] = cookies

            if self.logger:
                self.logger.info(f"从 {source.display_name} 加载了 {len(cookies)} 个 Cookie 数据")

        except Exception as e:
            if self.logger:
                self.logger.error(f"从 {source.display_name} 加载 Cookie 时出错: {e}")
            return []

        return cookies

    def _load_from_file(self, filename: str) -> List[Dict]:
        """从JSON文件加载 Cookie"""
        cookie_path = cookies_dir() / filename

        if not os.path.exists(cookie_path):
            raise FileNotFoundError(f"Cookie 文件不存在: {cookie_path}")

        with open(cookie_path, 'r', encoding='utf-8') as f:
            cookies_from_file = json.load(f)

        return convert_cookie_editor_to_playwright(cookies_from_file, logger=self.logger)

    def _load_from_env(self, env_var_name: str) -> List[Dict]:
        """从环境变量加载 Cookie"""
        env_value = clean_env_value(os.getenv(env_var_name))

        if not env_value:
            raise ValueError(f"环境变量 {env_var_name} 不存在或为空")

        return convert_kv_to_playwright(
            env_value,
            default_domain=".google.com",
            logger=self.logger
        )

    def get_all_sources(self) -> List[CookieSource]:
        """获取所有检测到的 Cookie 来源"""
        return self.detect_all_sources()

    def clear_cache(self):
        """清空 Cookie 缓存"""
        self._cookie_cache.clear()
        if self.logger:
            self.logger.debug("Cookie 缓存已清空")

    def get_source_summary(self) -> Dict[str, int]:
        """
        获取 Cookie 来源统计信息

        Returns:
            包含各类型来源数量的字典
        """
        sources = self.detect_all_sources()
        summary = {
            "total": len(sources),
            "files": 0,
            "env_vars": 0
        }

        for source in sources:
            if source.type == "file":
                summary["files"] += 1
            elif source.type == "env_var":
                summary["env_vars"] += 1

        return summary