import os import threading import multiprocessing import signal import sys import time from browser.instance import run_browser_instance from utils.logger import setup_logging from utils.paths import cookies_dir, logs_dir from utils.cookie_manager import CookieManager from utils.common import clean_env_value, ensure_dir # 全局变量 app_running = False flask_app = None # 使用 multiprocessing.Event 实现跨进程通信 shutdown_event = multiprocessing.Event() class ProcessManager: """进程管理器,负责跟踪和管理浏览器进程""" def __init__(self): self.processes = {} # {process_id: process_info} self.lock = threading.RLock() ensure_dir(logs_dir()) self.logger = setup_logging(str(logs_dir() / 'app.log'), prefix="manager") def add_process(self, process, config=None): """添加进程到管理器""" with self.lock: pid = process.pid if process and hasattr(process, 'pid') else None # 允许添加PID为None的进程(可能还在启动中),但会记录这个情况 if pid is None: # 使用临时ID作为key,等获得真实PID后再更新 temp_id = f"temp_{len(self.processes)}" self.logger.warning(f"进程PID暂时为None,使用临时ID {temp_id}") else: temp_id = pid process_info = { 'process': process, 'config': config, 'pid': pid, 'is_alive': True, 'start_time': time.time() } self.processes[temp_id] = process_info def update_temp_pids(self): """更新临时PID为真实PID""" with self.lock: temp_ids = [k for k in self.processes.keys() if isinstance(k, str) and k.startswith("temp_")] for temp_id in temp_ids: process_info = self.processes[temp_id] process = process_info['process'] if process and hasattr(process, 'pid') and process.pid is not None: # 更新为真实PID self.processes[process.pid] = process_info del self.processes[temp_id] process_info['pid'] = process.pid def remove_process(self, pid): """从管理器中移除进程""" with self.lock: if pid in self.processes: del self.processes[pid] def get_alive_processes(self): """获取所有存活进程""" with self.lock: # 首先尝试更新临时PID self.update_temp_pids() alive = [] dead_pids = [] for pid, info in self.processes.items(): process = info['process'] try: # 检查进程是否真实存在且是子进程 if process and hasattr(process, 'is_alive') and process.is_alive(): alive.append(process) else: dead_pids.append(pid) except (ValueError, ProcessLookupError) as e: # 进程已经不存在 dead_pids.append(pid) self.logger.warning(f"进程 {pid} 检查时出错: {e}") # 清理死进程记录 for pid in dead_pids: self.remove_process(pid) return alive def terminate_all(self, timeout=10): """优雅地终止所有进程""" with self.lock: # logger = setup_logging(str(logs_dir() / 'app.log'), prefix="signal") # 直接使用 self.logger,避免重复 setup_logging # 首先更新临时PID self.update_temp_pids() if not self.processes: self.logger.info("没有活跃的进程需要关闭") return self.logger.info(f"开始关闭 {len(self.processes)} 个进程...") # 第一阶段:发送SIGTERM信号 active_pids = [] for pid, info in list(self.processes.items()): process = info['process'] try: # 检查进程对象是否有效且进程存活 if process and hasattr(process, 'is_alive') and process.is_alive() and pid is not None: self.logger.info(f"发送SIGTERM给进程 {pid} (运行时长: {time.time() - info['start_time']:.1f}秒)") process.terminate() active_pids.append(pid) else: self.logger.info(f"进程 {pid if pid is not None else 'None'} 已经停止或无效") except (ValueError, ProcessLookupError, AttributeError) as e: self.logger.warning(f"进程 {pid if pid is not None else 'None'} 访问出错: {e}") if not active_pids: self.logger.info("所有进程已经停止") return # 第二阶段:等待进程退出 self.logger.info(f"等待 {len(active_pids)} 个进程优雅退出...") start_wait = time.time() while time.time() - start_wait < 5: # 最多等待5秒 still_alive = [] for pid in active_pids: if pid in self.processes: process = self.processes[pid]['process'] try: if process and hasattr(process, 'is_alive') and process.is_alive(): still_alive.append(pid) except (ValueError, ProcessLookupError, AttributeError): pass if not still_alive: self.logger.info("所有进程已优雅退出") return time.sleep(0.5) self.logger.info(f"仍有 {len(still_alive)} 个进程在运行,准备强制关闭...") # 第三阶段:强制杀死仍在运行的进程 for pid in active_pids: if pid in self.processes and pid is not None: process = self.processes[pid]['process'] try: if process and hasattr(process, 'is_alive') and process.is_alive(): self.logger.warning(f"进程 {pid} 未响应SIGTERM,强制终止") process.kill() except (ValueError, ProcessLookupError, AttributeError) as e: self.logger.info(f"进程 {pid} 已终止: {e}") self.logger.info("所有进程关闭完成") def get_count(self): """获取管理的进程总数""" with self.lock: return len(self.processes) def get_alive_count(self): """获取存活进程数""" return len(self.get_alive_processes()) # 全局进程管理器 process_manager = ProcessManager() def load_instance_configurations(logger): """ 使用CookieManager解析环境变量和Cookies目录,为每个Cookie来源创建独立的浏览器实例配置。 """ # 1. 读取所有实例共享的URL shared_url = clean_env_value(os.getenv("CAMOUFOX_INSTANCE_URL")) if not shared_url: logger.error("错误: 缺少环境变量 CAMOUFOX_INSTANCE_URL。所有实例需要一个共享的目标URL") return None, None # 2. 读取全局设置 global_settings = { "headless": clean_env_value(os.getenv("CAMOUFOX_HEADLESS")) or "virtual", "url": shared_url # 所有实例都使用这个URL } proxy_value = clean_env_value(os.getenv("CAMOUFOX_PROXY")) if proxy_value: global_settings["proxy"] = proxy_value # 3. 使用CookieManager检测所有Cookie来源 cookie_manager = CookieManager(logger) sources = cookie_manager.detect_all_sources() # 检查是否有任何Cookie来源 if not sources: logger.error("错误: 未找到任何Cookie来源(既没有JSON文件,也没有环境变量Cookie)") return None, None # 4. 为每个Cookie来源创建实例配置 instances = [] for source in sources: if source.type == "file": instances.append({ "cookie_file": source.identifier, "cookie_source": source }) elif source.type == "env_var": # 从环境变量名中提取索引,如 "USER_COOKIE_1" -> 1 env_index = source.identifier.split("_")[-1] instances.append({ "cookie_file": None, "env_cookie_index": int(env_index), "cookie_source": source }) logger.info(f"将启动 {len(instances)} 个浏览器实例") return global_settings, instances def start_browser_instances(run_mode="standalone"): """启动浏览器实例的核心逻辑""" global app_running, process_manager, shutdown_event log_dir = logs_dir() logger = setup_logging(str(log_dir / 'app.log')) logger.info("---------------------Camoufox 实例管理器开始启动---------------------") start_delay = int(os.getenv("INSTANCE_START_DELAY", "30")) logger.info(f"运行模式: {run_mode}; 实例启动间隔: {start_delay} 秒") global_settings, instance_profiles = load_instance_configurations(logger) if not instance_profiles: logger.error("错误: 环境变量中未找到任何实例配置") return for i, profile in enumerate(instance_profiles, 1): if not app_running: break final_config = global_settings.copy() final_config.update(profile) if 'url' not in final_config: logger.warning(f"警告: 跳过一个无效的配置项 (缺少 url): {profile}") continue cookie_source = final_config.get('cookie_source') if cookie_source: if cookie_source.type == "file": logger.info( f"正在启动第 {i}/{len(instance_profiles)} 个浏览器实例 (file: {cookie_source.display_name})..." ) elif cookie_source.type == "env_var": logger.info( f"正在启动第 {i}/{len(instance_profiles)} 个浏览器实例 (env: {cookie_source.display_name})..." ) else: logger.error(f"错误: 配置中缺少cookie_source对象") continue # 传递 shutdown_event 给子进程 process = multiprocessing.Process(target=run_browser_instance, args=(final_config, shutdown_event)) process.start() # 等待一小段时间让进程获得PID,然后再添加到管理器 time.sleep(0.1) process_manager.add_process(process, final_config) # 等待配置的时间,避免并发启动导致的高CPU占用 # 即使是最后一个实例,也等待一段时间让其初始化,然后再进入主循环 time.sleep(start_delay) # 等待所有进程 previous_count = None last_log_time = 0 try: while app_running: alive_processes = process_manager.get_alive_processes() current_count = len(alive_processes) # 仅在数量变化或间隔一段时间后再记录,避免过于频繁的日志 now = time.time() if current_count != previous_count or now - last_log_time >= 600: logger.info(f"当前运行的浏览器实例数: {current_count}") previous_count = current_count last_log_time = now if not alive_processes: logger.info("所有浏览器进程已结束,主进程即将退出") break # 等待进程并清理死进程 for process in alive_processes: try: process.join(timeout=1) except: pass time.sleep(1) except KeyboardInterrupt: logger.info("捕获到键盘中断信号,等待信号处理器完成关闭...") # 不在这里关闭进程,让信号处理器统一处理 pass # 确保在所有进程结束后退出 logger.info("浏览器实例管理器运行结束") def run_standalone_mode(): """独立模式""" global app_running app_running = True start_browser_instances(run_mode="standalone") def run_server_mode(): """服务器模式""" global app_running, flask_app log_dir = logs_dir() server_logger = setup_logging(str(log_dir / 'app.log'), prefix="server") # 动态导入 Flask(只在需要时) try: from flask import Flask, jsonify flask_app = Flask(__name__) except ImportError: server_logger.error("错误: 服务器模式需要 Flask,请安装: pip install flask") return app_running = True # 在后台线程中启动浏览器实例 browser_thread = threading.Thread(target=lambda: start_browser_instances(run_mode="server"), daemon=True) browser_thread.start() # 定义路由 @flask_app.route('/health') def health_check(): """健康检查端点""" global process_manager running_count = process_manager.get_alive_count() total_count = process_manager.get_count() return jsonify({ 'status': 'healthy', 'browser_instances': total_count, 'running_instances': running_count, 'message': f'Application is running with {running_count} active browser instances' }) @flask_app.route('/') def index(): """主页端点""" global process_manager running_count = process_manager.get_alive_count() total_count = process_manager.get_count() return jsonify({ 'status': 'running', 'browser_instances': total_count, 'running_instances': running_count, 'run_mode': 'server', 'message': 'Camoufox Browser Automation is running in server mode' }) # 禁用 Flask 的默认日志 import logging log = logging.getLogger('werkzeug') log.setLevel(logging.ERROR) # 启动 Flask 服务器 try: flask_app.run(host='0.0.0.0', port=7860, debug=False) except KeyboardInterrupt: server_logger.info("服务器正在关闭...") def signal_handler(signum, frame): """统一的信号处理器 - 只有主进程应该执行这个逻辑""" global app_running, process_manager, shutdown_event # 立即设置日志,确保能看到后续信息 logger = setup_logging(str(logs_dir() / 'app.log'), prefix="signal") logger.info(f"接收到信号 {signum},开始处理...") # 检查是否是主进程,防止子进程执行关闭逻辑 current_pid = os.getpid() # 使用一个简单的方法来判断:如果是子进程,通常没有全局变量 process_manager 的控制权 # 或者通过判断 multiprocessing.current_process().name if multiprocessing.current_process().name != 'MainProcess': # 子进程接收到信号,通常应该由主进程来管理,或者子进程会因为主进程发送的SIGTERM而终止 # 这里我们选择忽略,让主进程通过terminate来管理,或者子进程通过shutdown_event来退出 logger.info(f"子进程 {current_pid} 接收到信号 {signum},忽略主进程信号处理逻辑") return logger.info(f"主进程 {current_pid} 接收到信号 {signum},正在关闭应用...") # 1. 立即设置全局标志,阻止新的进程创建 app_running = False # 2. 设置跨进程关闭事件,通知所有子进程优雅退出 try: shutdown_event.set() logger.info("已设置全局关闭事件 (shutdown_event)") except Exception as e: logger.error(f"设置关闭事件时发生错误: {e}") # 3. 调用进程管理器的优雅终止方法 try: process_manager.terminate_all(timeout=10) except Exception as e: logger.error(f"调用 terminate_all 时发生错误: {e}") logger.info("应用关闭流程结束,主进程退出") sys.exit(0) def main(): """主入口函数""" # 初始化必要的目录 ensure_dir(logs_dir()) ensure_dir(cookies_dir()) # 注册信号处理器 - 添加更多信号的捕获 signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) # 在某些环境中可能还有其他信号 try: signal.signal(signal.SIGQUIT, signal_handler) except (ValueError, AttributeError): pass try: signal.signal(signal.SIGHUP, signal_handler) except (ValueError, AttributeError): pass # 检查运行模式环境变量 hg_mode = os.getenv('HG', '').lower() if hg_mode == 'true': run_server_mode() else: run_standalone_mode() if __name__ == "__main__": multiprocessing.freeze_support() main()