""" ⚡ Speed-Optimized Multi-Agent RAG System for Complex Questions 병렬 처리, 동적 파이프라인으로 복잡한 질문도 빠르게 처리 Enhanced with multi-language support and improved error handling (캐싱 기능 제거 버전 + 모델 정보 보호) """ import os import json import time import asyncio import hashlib import re import sys from typing import Optional, List, Dict, Any, Tuple, Generator, AsyncGenerator from datetime import datetime, timedelta from enum import Enum from collections import deque import threading import queue from concurrent.futures import ThreadPoolExecutor, as_completed import aiohttp import requests import gradio as gr from pydantic import BaseModel, Field from dotenv import load_dotenv # 환경변수 로드 load_dotenv() # ============================================================================ # 데이터 모델 정의 # ============================================================================ class AgentRole(Enum): """에이전트 역할 정의""" SUPERVISOR = "supervisor" CREATIVE = "creative" CRITIC = "critic" FINALIZER = "finalizer" class ExecutionMode(Enum): """실행 모드 정의""" PARALLEL = "parallel" # 병렬 처리 SEQUENTIAL = "sequential" # 순차 처리 HYBRID = "hybrid" # 하이브리드 class Message(BaseModel): role: str content: str timestamp: Optional[datetime] = None class AgentResponse(BaseModel): role: AgentRole content: str processing_time: float metadata: Optional[Dict] = None # ============================================================================ # 언어 감지 유틸리티 # ============================================================================ class LanguageDetector: """언어 감지 및 처리 유틸리티""" @staticmethod def detect_language(text: str) -> str: """간단한 언어 감지""" import re # 한글 패턴 korean_pattern = re.compile('[가-힣]+') # 일본어 패턴 (히라가나, 가타카나) japanese_pattern = re.compile('[ぁ-ん]+|[ァ-ヴー]+') # 중국어 패턴 chinese_pattern = re.compile('[\u4e00-\u9fff]+') # 텍스트 길이 대비 각 언어 문자 비율 계산 text_length = len(text) if text_length == 0: return 'en' korean_chars = len(korean_pattern.findall(text)) japanese_chars = len(japanese_pattern.findall(text)) chinese_chars = len(chinese_pattern.findall(text)) # 한글 비율이 10% 이상이면 한국어 if korean_chars > 0 and (korean_chars / text_length > 0.1): return 'ko' # 일본어 문자가 있으면 일본어 elif japanese_chars > 0: return 'ja' # 중국어 문자가 있으면 중국어 elif chinese_chars > 0: return 'zh' else: return 'en' # ============================================================================ # 병렬 처리 최적화 Brave Search (개선됨) # ============================================================================ class AsyncBraveSearch: """비동기 Brave 검색 클라이언트 with retry logic""" def __init__(self, api_key: Optional[str] = None): self.api_key = api_key or os.getenv("BRAVE_SEARCH_API_KEY") self.base_url = "https://api.search.brave.com/res/v1/web/search" self.max_retries = 3 async def search_async(self, query: str, count: int = 5, lang: str = 'ko') -> List[Dict]: """비동기 검색 with retry""" if not self.api_key: return [] headers = { "Accept": "application/json", "X-Subscription-Token": self.api_key } # 언어별 파라미터 설정 lang_params = { 'ko': {"search_lang": "ko", "country": "KR"}, 'en': {"search_lang": "en", "country": "US"}, 'ja': {"search_lang": "ja", "country": "JP"}, 'zh': {"search_lang": "zh", "country": "CN"} } params = { "q": query, "count": count, "text_decorations": False, **lang_params.get(lang, lang_params['en']) } for attempt in range(self.max_retries): try: async with aiohttp.ClientSession() as session: async with session.get( self.base_url, headers=headers, params=params, timeout=aiohttp.ClientTimeout(total=5) ) as response: if response.status == 200: data = await response.json() results = [] if "web" in data and "results" in data["web"]: for item in data["web"]["results"][:count]: results.append({ "title": item.get("title", ""), "url": item.get("url", ""), "description": item.get("description", ""), "age": item.get("age", "") }) return results elif response.status == 429: # Rate limit await asyncio.sleep(2 ** attempt) continue except aiohttp.ClientError as e: if attempt < self.max_retries - 1: await asyncio.sleep(2 ** attempt) # Exponential backoff continue except Exception: pass return [] async def batch_search(self, queries: List[str], lang: str = 'ko') -> List[List[Dict]]: """여러 검색을 배치로 처리""" tasks = [self.search_async(q, lang=lang) for q in queries] results = await asyncio.gather(*tasks, return_exceptions=True) # 예외 처리 return [r if not isinstance(r, Exception) else [] for r in results] # ============================================================================ # 최적화된 Fireworks 클라이언트 (개선됨) # ============================================================================ class OptimizedFireworksClient: """최적화된 LLM 클라이언트 with language support""" def __init__(self, api_key: Optional[str] = None): self.api_key = api_key or os.getenv("FIREWORKS_API_KEY") if not self.api_key: raise ValueError("FIREWORKS_API_KEY is required!") self.base_url = "https://api.fireworks.ai/inference/v1/chat/completions" self.headers = { "Accept": "application/json", "Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}" } # 항상 최고 성능 모델 사용 (복잡한 질문 전제) self.model = "accounts/fireworks/models/qwen3-235b-a22b-instruct-2507" self.max_retries = 3 def compress_prompt(self, text: str, max_length: int = 2000) -> str: """프롬프트 압축""" if len(text) <= max_length: return text # 중요한 부분 우선순위로 자르기 sentences = text.split('.') compressed = [] current_length = 0 for sentence in sentences: if current_length + len(sentence) > max_length: break compressed.append(sentence) current_length += len(sentence) return '.'.join(compressed) async def chat_stream_async( self, messages: List[Dict], **kwargs ) -> AsyncGenerator[str, None]: """비동기 스트리밍 대화 with retry""" payload = { "model": self.model, "messages": messages, "max_tokens": kwargs.get("max_tokens", 2000), "temperature": kwargs.get("temperature", 0.7), "top_p": kwargs.get("top_p", 1.0), "top_k": kwargs.get("top_k", 40), "stream": True } for attempt in range(self.max_retries): try: async with aiohttp.ClientSession() as session: async with session.post( self.base_url, headers={**self.headers, "Accept": "text/event-stream"}, json=payload, timeout=aiohttp.ClientTimeout(total=30) ) as response: async for line in response.content: line_str = line.decode('utf-8').strip() if line_str.startswith("data: "): data_str = line_str[6:] if data_str == "[DONE]": break try: data = json.loads(data_str) if "choices" in data and len(data["choices"]) > 0: delta = data["choices"][0].get("delta", {}) if "content" in delta: yield delta["content"] except json.JSONDecodeError: continue return # Success except aiohttp.ClientError as e: if attempt < self.max_retries - 1: await asyncio.sleep(2 ** attempt) continue else: yield f"Error after {self.max_retries} attempts: {str(e)}" except Exception as e: yield f"Unexpected error: {str(e)}" break # ============================================================================ # 경량화된 추론 체인 (다국어 지원) # ============================================================================ class LightweightReasoningChain: """빠른 추론을 위한 템플릿 기반 시스템""" def __init__(self): self.templates = { "ko": { "problem_solving": { "steps": ["문제 분해", "핵심 요인", "해결 방안", "구현 전략"], "prompt": "체계적으로 단계별로 분석하고 해결책을 제시하세요." }, "creative_thinking": { "steps": ["기존 접근", "창의적 대안", "혁신 포인트", "실행 방법"], "prompt": "기존 방식을 넘어선 창의적이고 혁신적인 접근을 제시하세요." }, "critical_analysis": { "steps": ["현황 평가", "강점/약점", "기회/위협", "개선 방향"], "prompt": "비판적 관점에서 철저히 분석하고 개선점을 도출하세요." } }, "en": { "problem_solving": { "steps": ["Problem Breakdown", "Key Factors", "Solutions", "Implementation Strategy"], "prompt": "Systematically analyze step by step and provide solutions." }, "creative_thinking": { "steps": ["Traditional Approach", "Creative Alternatives", "Innovation Points", "Execution Method"], "prompt": "Provide creative and innovative approaches beyond conventional methods." }, "critical_analysis": { "steps": ["Current Assessment", "Strengths/Weaknesses", "Opportunities/Threats", "Improvement Direction"], "prompt": "Thoroughly analyze from a critical perspective and derive improvements." } } } def get_reasoning_structure(self, query_type: str, lang: str = 'ko') -> Dict: """쿼리 유형에 맞는 추론 구조 반환""" lang_templates = self.templates.get(lang, self.templates['en']) return lang_templates.get(query_type, lang_templates["problem_solving"]) def get_reasoning_pattern(self, query: str, lang: str = 'ko') -> Optional[Dict]: """쿼리에 적합한 추론 패턴 반환""" query_lower = query.lower() # 언어별 키워드 매핑 pattern_keywords = { 'ko': { 'problem_solving': ['해결', '방법', '전략', '계획'], 'creative_thinking': ['창의적', '혁신적', '새로운', '아이디어'], 'critical_analysis': ['분석', '평가', '비교', '영향'] }, 'en': { 'problem_solving': ['solve', 'solution', 'strategy', 'plan'], 'creative_thinking': ['creative', 'innovative', 'novel', 'idea'], 'critical_analysis': ['analyze', 'evaluate', 'compare', 'impact'] } } keywords = pattern_keywords.get(lang, pattern_keywords['en']) for pattern_type, words in keywords.items(): if any(word in query_lower for word in words): return self.get_reasoning_structure(pattern_type, lang) return self.get_reasoning_structure('problem_solving', lang) # ============================================================================ # 조기 종료 메커니즘 (개선됨) # ============================================================================ class QualityChecker: """품질 체크 및 조기 종료 결정""" def __init__(self, min_quality: float = 0.75): self.min_quality = min_quality self.quality_metrics = { "length": 0.2, "structure": 0.3, "completeness": 0.3, "clarity": 0.2 } def evaluate_response(self, response: str, query: str, lang: str = 'ko') -> Tuple[float, bool]: """응답 품질 평가 (언어별)""" scores = {} # 언어별 최소 길이 기준 min_length = {'ko': 500, 'en': 400, 'ja': 400, 'zh': 300} target_length = min_length.get(lang, 400) # 길이 평가 scores["length"] = min(len(response) / target_length, 1.0) # 구조 평가 (언어별 마커) structure_markers = { 'ko': ["1.", "2.", "•", "-", "첫째", "둘째", "결론", "요약"], 'en': ["1.", "2.", "•", "-", "First", "Second", "Conclusion", "Summary"], 'ja': ["1.", "2.", "•", "-", "第一", "第二", "結論", "要約"], 'zh': ["1.", "2.", "•", "-", "第一", "第二", "结论", "总结"] } markers = structure_markers.get(lang, structure_markers['en']) scores["structure"] = sum(1 for m in markers if m in response) / len(markers) # 완전성 평가 (쿼리 키워드 포함 여부) query_words = set(query.split()) response_words = set(response.split()) scores["completeness"] = len(query_words & response_words) / max(len(query_words), 1) # 명확성 평가 (문장 구조) sentence_delimiters = { 'ko': '.', 'en': '.', 'ja': '。', 'zh': '。' } delimiter = sentence_delimiters.get(lang, '.') sentences = response.split(delimiter) avg_sentence_length = sum(len(s.split()) for s in sentences) / max(len(sentences), 1) scores["clarity"] = min(avg_sentence_length / 20, 1.0) # 가중 평균 계산 total_score = sum( scores[metric] * weight for metric, weight in self.quality_metrics.items() ) should_continue = total_score < self.min_quality return total_score, should_continue # ============================================================================ # 스트리밍 최적화 (개선됨) # ============================================================================ class OptimizedStreaming: """스트리밍 버퍼 최적화 with adaptive buffering""" def __init__(self, chunk_size: int = 20, flush_interval: float = 0.05): self.chunk_size = chunk_size self.flush_interval = flush_interval self.buffer = "" self.last_flush = time.time() self.adaptive_size = chunk_size async def buffer_and_yield( self, stream: AsyncGenerator[str, None], adaptive: bool = True ) -> AsyncGenerator[str, None]: """버퍼링된 스트리밍 with adaptive sizing""" chunk_count = 0 async for chunk in stream: self.buffer += chunk current_time = time.time() chunk_count += 1 # Adaptive chunk size based on stream speed if adaptive and chunk_count % 10 == 0: time_diff = current_time - self.last_flush if time_diff < 0.02: # Too fast, increase buffer self.adaptive_size = min(self.adaptive_size + 5, 100) elif time_diff > 0.1: # Too slow, decrease buffer self.adaptive_size = max(self.adaptive_size - 5, 10) if (len(self.buffer) >= self.adaptive_size or current_time - self.last_flush >= self.flush_interval): yield self.buffer self.buffer = "" self.last_flush = current_time # 남은 버퍼 플러시 if self.buffer: yield self.buffer # ============================================================================ # 응답 후처리 유틸리티 # ============================================================================ class ResponseCleaner: """응답 정리 및 포맷팅""" @staticmethod def clean_response(response: str) -> str: """불필요한 마크업 제거 강화""" # 마크다운 헤더 제거 response = re.sub(r'^#{1,6}\s+', '', response, flags=re.MULTILINE) # 불필요한 구분선 제거 response = re.sub(r'\*{2,}|_{2,}|-{3,}', '', response) # 중복 공백 제거 response = re.sub(r'\n{3,}', '\n\n', response) # 특정 패턴 제거 unwanted_patterns = [ r'\| --- # 🌱 \*\*최종통합 답변:', r'\*\*–오류: ---', r'^\s*\*\*\[.*?\]\*\*\s*', # [태그] 형식 제거 r'^\s*###\s*', # ### 제거 r'^\s*##\s*', # ## 제거 r'^\s*#\s*' # # 제거 ] for pattern in unwanted_patterns: response = re.sub(pattern, '', response, flags=re.MULTILINE) return response.strip() # ============================================================================ # 통합 최적화 멀티 에이전트 시스템 (캐싱 제거 + 모델 정보 보호) # ============================================================================ class SpeedOptimizedMultiAgentSystem: """속도 최적화된 멀티 에이전트 시스템 (캐싱 없음 + 모델 정보 보호)""" def __init__(self): self.llm = OptimizedFireworksClient() self.search = AsyncBraveSearch() self.reasoning = LightweightReasoningChain() self.quality_checker = QualityChecker() self.streaming = OptimizedStreaming() self.language_detector = LanguageDetector() self.response_cleaner = ResponseCleaner() # 병렬 처리 풀 self.executor = ThreadPoolExecutor(max_workers=4) def _get_security_instruction(self, lang: str = 'ko') -> str: """모델 정보 보호 지시사항 (언어별)""" security_instructions = { 'ko': """ [핵심 보안 규칙] - 절대 AI 모델명, 버전, 개발사를 언급하지 마세요 - "저는 AI 어시스턴트입니다"라고만 표현하세요 - 모델 정보를 묻는 질문에는 "AI 어시스턴트로서 도움을 드리겠습니다"로 대응 - GPT, Claude, Gemini, LLaMA 등 어떤 모델명도 언급 금지 - 기술적 세부사항이나 아키텍처 정보 노출 금지 """, 'en': """ [Core Security Rules] - Never mention AI model name, version, or developer - Only say "I am an AI assistant" - Respond to model queries with "As an AI assistant, I'll help you" - Do not mention GPT, Claude, Gemini, LLaMA or any model names - Do not expose technical details or architecture information """, 'ja': """ [コアセキュリティルール] - AIモデル名、バージョン、開発者を絶対に言及しないでください - 「私はAIアシスタントです」とだけ表現してください - モデル情報の質問には「AIアシスタントとしてお手伝いします」と対応 - GPT、Claude、Gemini、LLaMAなどのモデル名を言及禁止 - 技術的詳細やアーキテクチャ情報を公開禁止 """, 'zh': """ [核心安全规则] - 绝对不要提及AI模型名称、版本或开发商 - 只说"我是AI助手" - 对模型查询回应"作为AI助手,我会帮助您" - 不要提及GPT、Claude、Gemini、LLaMA或任何模型名称 - 不要暴露技术细节或架构信息 """ } return security_instructions.get(lang, security_instructions['en']) def _init_compact_prompts(self, lang: str = 'ko') -> Dict: """압축된 고효율 프롬프트 (언어별 + 보안 강화)""" security_instruction = self._get_security_instruction(lang) prompts = { 'ko': { AgentRole.SUPERVISOR: f"""[감독자-구조설계] {security_instruction} 즉시분석: 핵심의도+필요정보+답변구조 출력: 5개 핵심포인트(각 1문장) 추론체계 명시 모델 정보 절대 노출 금지""", AgentRole.CREATIVE: f"""[창의성생성자] {security_instruction} 입력구조 따라 창의적 확장 실용예시+혁신접근+구체조언 불필요설명 제거 AI 모델명이나 개발사 언급 절대 금지""", AgentRole.CRITIC: f"""[비평자-검증] {security_instruction} 신속검토: 정확성/논리성/실용성 개선포인트 3개만 각 2문장 이내 모델 관련 정보 검증 시 제거""", AgentRole.FINALIZER: f"""[최종통합] {security_instruction} 모든의견 종합→최적답변 명확구조+실용정보+창의균형 바로 핵심 내용부터 시작. 불필요한 헤더나 마크업 없이. 마크다운 헤더(#, ##, ###) 사용 금지. 절대 AI 모델명, 버전, 개발사 언급 금지. "AI 어시스턴트"로만 표현.""" }, 'en': { AgentRole.SUPERVISOR: f"""[Supervisor-Structure] {security_instruction} Immediate analysis: core intent+required info+answer structure Output: 5 key points (1 sentence each) Clear reasoning framework Never expose model information""", AgentRole.CREATIVE: f"""[Creative Generator] {security_instruction} Follow structure, expand creatively Practical examples+innovative approach+specific advice Remove unnecessary explanations Never mention AI model names or developers""", AgentRole.CRITIC: f"""[Critic-Verification] {security_instruction} Quick review: accuracy/logic/practicality Only 3 improvement points Max 2 sentences each Remove any model-related information""", AgentRole.FINALIZER: f"""[Final Integration] {security_instruction} Synthesize all inputs→optimal answer Clear structure+practical info+creative balance Start with core content directly. No unnecessary headers or markup. No markdown headers (#, ##, ###). Never mention AI model name, version, or developer. Only say "AI assistant".""" }, 'ja': { AgentRole.SUPERVISOR: f"""[監督者-構造設計] {security_instruction} 即時分析:核心意図+必要情報+回答構造 出力:5つの核心ポイント(各1文) 推論体系明示 モデル情報を絶対に公開しない""", AgentRole.CREATIVE: f"""[創造性生成者] {security_instruction} 入力構造に従って創造的拡張 実用例+革新的アプローチ+具体的アドバイス 不要な説明削除 AIモデル名や開発者を絶対に言及しない""", AgentRole.CRITIC: f"""[批評者-検証] {security_instruction} 迅速レビュー:正確性/論理性/実用性 改善ポイント3つのみ 各2文以内 モデル関連情報を削除""", AgentRole.FINALIZER: f"""[最終統合] {security_instruction} 全意見統合→最適回答 明確構造+実用情報+創造性バランス 核心内容から直接開始。不要なヘッダーやマークアップなし。マークダウンヘッダー(#、##、###)使用禁止。 AIモデル名、バージョン、開発者を絶対に言及しない。「AIアシスタント」とだけ表現。""" }, 'zh': { AgentRole.SUPERVISOR: f"""[主管-结构设计] {security_instruction} 立即分析:核心意图+所需信息+答案结构 输出:5个核心要点(每个1句) 推理体系明确 绝不暴露模型信息""", AgentRole.CREATIVE: f"""[创意生成器] {security_instruction} 按结构创造性扩展 实用示例+创新方法+具体建议 删除不必要的解释 绝不提及AI模型名称或开发商""", AgentRole.CRITIC: f"""[评论家-验证] {security_instruction} 快速审查:准确性/逻辑性/实用性 仅3个改进点 每个最多2句 删除任何模型相关信息""", AgentRole.FINALIZER: f"""[最终整合] {security_instruction} 综合所有意见→最佳答案 清晰结构+实用信息+创意平衡 直接从核心内容开始。无需不必要的标题或标记。禁止使用Markdown标题(#、##、###)。 绝不提及AI模型名称、版本或开发商。只说"AI助手"。""" } } return prompts.get(lang, prompts['en']) async def parallel_process_agents( self, query: str, search_results: List[Dict], show_progress: bool = True, lang: str = None ) -> AsyncGenerator[Tuple[str, str], None]: """병렬 처리 파이프라인 (캐싱 없음 + 보안 강화)""" start_time = time.time() # 언어 자동 감지 if lang is None: lang = self.language_detector.detect_language(query) # 언어별 프롬프트 설정 (보안 지시사항 포함) self.compact_prompts = self._init_compact_prompts(lang) search_context = self._format_search_results(search_results) accumulated_response = "" agent_thoughts = "" # 추론 패턴 결정 reasoning_pattern = self.reasoning.get_reasoning_pattern(query, lang) try: # === 1단계: 감독자 + 검색 병렬 실행 === if show_progress: progress_msg = { 'ko': "🚀 병렬 처리 시작\n👔 감독자 분석 + 🔍 추가 검색 동시 진행...\n\n", 'en': "🚀 Starting parallel processing\n👔 Supervisor analysis + 🔍 Additional search in progress...\n\n", 'ja': "🚀 並列処理開始\n👔 監督者分析 + 🔍 追加検索同時進行中...\n\n", 'zh': "🚀 开始并行处理\n👔 主管分析 + 🔍 附加搜索同时进行...\n\n" } agent_thoughts = progress_msg.get(lang, progress_msg['en']) yield accumulated_response, agent_thoughts # 감독자 프롬프트 (언어별) supervisor_prompt_templates = { 'ko': f""" 질문: {query} 검색결과: {search_context} 추론패턴: {reasoning_pattern} 즉시 핵심구조 5개 제시 모델 정보는 절대 언급하지 마세요""", 'en': f""" Question: {query} Search results: {search_context} Reasoning pattern: {reasoning_pattern} Immediately provide 5 key structures Never mention model information""", 'ja': f""" 質問: {query} 検索結果: {search_context} 推論パターン: {reasoning_pattern} 即座に5つの核心構造を提示 モデル情報は絶対に言及しないでください""", 'zh': f""" 问题: {query} 搜索结果: {search_context} 推理模式: {reasoning_pattern} 立即提供5个核心结构 绝不提及模型信息""" } supervisor_prompt = supervisor_prompt_templates.get(lang, supervisor_prompt_templates['en']) supervisor_response = "" supervisor_task = self.llm.chat_stream_async( messages=[ {"role": "system", "content": self.compact_prompts[AgentRole.SUPERVISOR]}, {"role": "user", "content": supervisor_prompt} ], temperature=0.3, max_tokens=500 ) # 감독자 스트리밍 (버퍼링) async for chunk in self.streaming.buffer_and_yield(supervisor_task): supervisor_response += chunk if show_progress and len(supervisor_response) < 300: supervisor_label = { 'ko': "👔 감독자 분석", 'en': "👔 Supervisor Analysis", 'ja': "👔 監督者分析", 'zh': "👔 主管分析" } agent_thoughts = f"{supervisor_label.get(lang, supervisor_label['en'])}\n{supervisor_response[:300]}...\n\n" yield accumulated_response, agent_thoughts # === 2단계: 창의성 + 비평 준비 병렬 === if show_progress: creative_msg = { 'ko': "🎨 창의성 생성자 + 🔍 비평자 준비...\n\n", 'en': "🎨 Creative Generator + 🔍 Critic preparing...\n\n", 'ja': "🎨 創造性生成者 + 🔍 批評者準備中...\n\n", 'zh': "🎨 创意生成器 + 🔍 评论家准备中...\n\n" } agent_thoughts += creative_msg.get(lang, creative_msg['en']) yield accumulated_response, agent_thoughts # 창의성 생성 시작 (언어별) creative_prompt_templates = { 'ko': f""" 질문: {query} 감독자구조: {supervisor_response} 검색결과: {search_context} 창의적+실용적 답변 즉시생성 AI 모델 정보 언급 금지""", 'en': f""" Question: {query} Supervisor structure: {supervisor_response} Search results: {search_context} Generate creative+practical answer immediately Do not mention AI model information""", 'ja': f""" 質問: {query} 監督者構造: {supervisor_response} 検索結果: {search_context} 創造的+実用的回答即座生成 AIモデル情報を言及禁止""", 'zh': f""" 问题: {query} 主管结构: {supervisor_response} 搜索结果: {search_context} 立即生成创意+实用答案 禁止提及AI模型信息""" } creative_prompt = creative_prompt_templates.get(lang, creative_prompt_templates['en']) creative_response = "" creative_partial = "" critic_started = False critic_response = "" creative_task = self.llm.chat_stream_async( messages=[ {"role": "system", "content": self.compact_prompts[AgentRole.CREATIVE]}, {"role": "user", "content": creative_prompt} ], temperature=0.8, max_tokens=1500 ) # 창의성 스트리밍 + 비평자 조기 시작 async for chunk in self.streaming.buffer_and_yield(creative_task): creative_response += chunk creative_partial += chunk # 창의성 응답이 500자 넘으면 비평자 시작 if len(creative_partial) > 500 and not critic_started: critic_started = True # 비평자 비동기 시작 (언어별) critic_prompt_templates = { 'ko': f""" 원본질문: {query} 창의성답변(일부): {creative_partial} 신속검토→개선점3개 모델 정보가 있으면 제거 지적""", 'en': f""" Original question: {query} Creative answer (partial): {creative_partial} Quick review→3 improvements Point out if model information exists""", 'ja': f""" 元の質問: {query} 創造的回答(一部): {creative_partial} 迅速レビュー→改善点3つ モデル情報があれば削除指摘""", 'zh': f""" 原始问题: {query} 创意答案(部分): {creative_partial} 快速审查→3个改进点 如有模型信息则指出删除""" } critic_prompt = critic_prompt_templates.get(lang, critic_prompt_templates['en']) critic_task = asyncio.create_task( self._run_critic_async(critic_prompt) ) if show_progress: display_creative = creative_response[:400] + "..." if len(creative_response) > 400 else creative_response creative_label = { 'ko': "🎨 창의성 생성자", 'en': "🎨 Creative Generator", 'ja': "🎨 創造性生成者", 'zh': "🎨 创意生成器" } agent_thoughts = f"{creative_label.get(lang, creative_label['en'])}\n{display_creative}\n\n" yield accumulated_response, agent_thoughts # 비평자 결과 대기 if critic_started: critic_response = await critic_task if show_progress: critic_label = { 'ko': "🔍 비평자 검토", 'en': "🔍 Critic Review", 'ja': "🔍 批評者レビュー", 'zh': "🔍 评论家审查" } agent_thoughts += f"{critic_label.get(lang, critic_label['en'])}\n{critic_response[:200]}...\n\n" yield accumulated_response, agent_thoughts # === 3단계: 품질 체크 및 조기 종료 === quality_score, need_more = self.quality_checker.evaluate_response( creative_response, query, lang ) if not need_more and quality_score > 0.85: # 품질이 충분히 높으면 바로 반환 accumulated_response = self.response_cleaner.clean_response(creative_response) if show_progress: quality_msg = { 'ko': f"✅ 품질 충족 (점수: {quality_score:.2f})\n조기 완료!\n", 'en': f"✅ Quality met (score: {quality_score:.2f})\nEarly completion!\n", 'ja': f"✅ 品質満足 (スコア: {quality_score:.2f})\n早期完了!\n", 'zh': f"✅ 质量满足 (分数: {quality_score:.2f})\n提前完成!\n" } agent_thoughts += quality_msg.get(lang, quality_msg['en']) yield accumulated_response, agent_thoughts return # === 4단계: 최종 통합 (스트리밍) === if show_progress: final_msg = { 'ko': "✅ 최종 통합 중...\n\n", 'en': "✅ Final integration in progress...\n\n", 'ja': "✅ 最終統合中...\n\n", 'zh': "✅ 最终整合中...\n\n" } agent_thoughts += final_msg.get(lang, final_msg['en']) yield accumulated_response, agent_thoughts # 최종 프롬프트 (언어별) final_prompt_templates = { 'ko': f""" 질문: {query} 창의성답변: {creative_response} 비평피드백: {critic_response} 감독자구조: {supervisor_response} 최종통합→완벽답변. 마크다운 헤더(#, ##, ###) 사용 금지. 절대 AI 모델명, 버전, 개발사 언급 금지.""", 'en': f""" Question: {query} Creative answer: {creative_response} Critic feedback: {critic_response} Supervisor structure: {supervisor_response} Final integration→perfect answer. No markdown headers (#, ##, ###). Never mention AI model name, version, or developer.""", 'ja': f""" 質問: {query} 創造的回答: {creative_response} 批評フィードバック: {critic_response} 監督者構造: {supervisor_response} 最終統合→完璧な回答。マークダウンヘッダー(#、##、###)使用禁止。 AIモデル名、バージョン、開発者を絶対に言及しない。""", 'zh': f""" 问题: {query} 创意答案: {creative_response} 评论反馈: {critic_response} 主管结构: {supervisor_response} 最终整合→完美答案。禁止使用Markdown标题(#、##、###)。 绝不提及AI模型名称、版本或开发商。""" } final_prompt = final_prompt_templates.get(lang, final_prompt_templates['en']) final_task = self.llm.chat_stream_async( messages=[ {"role": "system", "content": self.compact_prompts[AgentRole.FINALIZER]}, {"role": "user", "content": final_prompt} ], temperature=0.5, max_tokens=2500 ) # 최종 답변 스트리밍 accumulated_response = "" async for chunk in final_task: accumulated_response += chunk # 실시간 정리 cleaned_response = self.response_cleaner.clean_response(accumulated_response) yield cleaned_response, agent_thoughts # 최종 정리 accumulated_response = self.response_cleaner.clean_response(accumulated_response) # 처리 시간 추가 (언어별) processing_time = time.time() - start_time time_msg = { 'ko': f"\n\n---\n⚡ 처리 시간: {processing_time:.1f}초", 'en': f"\n\n---\n⚡ Processing time: {processing_time:.1f} seconds", 'ja': f"\n\n---\n⚡ 処理時間: {processing_time:.1f}秒", 'zh': f"\n\n---\n⚡ 处理时间: {processing_time:.1f}秒" } accumulated_response += time_msg.get(lang, time_msg['en']) yield accumulated_response, agent_thoughts except Exception as e: error_msg = { 'ko': f"❌ 오류 발생: {str(e)}", 'en': f"❌ Error occurred: {str(e)}", 'ja': f"❌ エラー発生: {str(e)}", 'zh': f"❌ 发生错误: {str(e)}" } yield error_msg.get(lang, error_msg['en']), agent_thoughts async def _run_critic_async(self, prompt: str) -> str: """비평자 비동기 실행 with error handling""" try: response = "" async for chunk in self.llm.chat_stream_async( messages=[ {"role": "system", "content": self.compact_prompts[AgentRole.CRITIC]}, {"role": "user", "content": prompt} ], temperature=0.2, max_tokens=500 ): response += chunk return response except Exception as e: # 언어 감지 lang = 'ko' if '질문' in prompt else 'en' error_msg = { 'ko': "비평 처리 중 오류", 'en': "Error during critic processing", 'ja': "批評処理中のエラー", 'zh': "评论处理中出错" } return error_msg.get(lang, error_msg['en']) def _format_search_results(self, results: List[Dict]) -> str: """검색 결과 압축 포맷""" if not results: return "No search results" formatted = [] for i, r in enumerate(results[:3], 1): title = r.get('title', '')[:50] desc = r.get('description', '')[:100] formatted.append(f"[{i}]{title}:{desc}") return " | ".join(formatted) # ============================================================================ # Gradio UI (최적화 버전 - 캐싱 제거 + 보안 강화) # ============================================================================ def create_optimized_gradio_interface(): """최적화된 Gradio 인터페이스 (캐싱 없음 + 모델 정보 보호)""" # 시스템 초기화 system = SpeedOptimizedMultiAgentSystem() def process_query_optimized( message: str, history: List[Dict], use_search: bool, show_agent_thoughts: bool, search_count: int, language_mode: str ): """최적화된 쿼리 처리 - 실시간 스트리밍 버전""" if not message: yield history, "", "" return # 언어 설정 if language_mode == "Auto": lang = None # 자동 감지 else: lang_map = {"Korean": "ko", "English": "en", "Japanese": "ja", "Chinese": "zh"} lang = lang_map.get(language_mode, None) # 비동기 함수를 동기적으로 실행 try: import nest_asyncio nest_asyncio.apply() except ImportError: pass try: # 검색 수행 (동기화) search_results = [] search_display = "" # 언어 자동 감지 (필요한 경우) detected_lang = lang or system.language_detector.detect_language(message) if use_search: # 검색 상태 표시 processing_msg = { 'ko': "⚡ 고속 처리 중...", 'en': "⚡ High-speed processing...", 'ja': "⚡ 高速処理中...", 'zh': "⚡ 高速处理中..." } history_with_message = history + [ {"role": "user", "content": message}, {"role": "assistant", "content": processing_msg.get(detected_lang, processing_msg['en'])} ] yield history_with_message, "", "" # 비동기 검색을 동기적으로 실행 async def search_wrapper(): return await system.search.search_async(message, count=search_count, lang=detected_lang) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) search_results = loop.run_until_complete(search_wrapper()) if search_results: ref_label = { 'ko': "📚 참고 자료", 'en': "📚 References", 'ja': "📚 参考資料", 'zh': "📚 参考资料" } search_display = f"{ref_label.get(detected_lang, ref_label['en'])}\n\n" for i, result in enumerate(search_results[:3], 1): search_display += f"**{i}. [{result['title'][:50]}]({result['url']})**\n" search_display += f" {result['description'][:100]}...\n\n" # 사용자 메시지 추가 current_history = history + [{"role": "user", "content": message}] # 실시간 스트리밍을 위한 비동기 처리 async def stream_responses(): """실시간 스트리밍 제너레이터""" async for response, thoughts in system.parallel_process_agents( query=message, search_results=search_results, show_progress=show_agent_thoughts, lang=detected_lang ): yield response, thoughts # 새 이벤트 루프에서 실시간 스트리밍 loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # 비동기 제너레이터를 동기적으로 순회 gen = stream_responses() while True: try: # 다음 항목 가져오기 task = asyncio.ensure_future(gen.__anext__(), loop=loop) response, thoughts = loop.run_until_complete(task) # 실시간 업데이트 updated_history = current_history + [ {"role": "assistant", "content": response} ] yield updated_history, thoughts, search_display except StopAsyncIteration: break except Exception as e: error_history = history + [ {"role": "user", "content": message}, {"role": "assistant", "content": f"❌ Error: {str(e)}"} ] yield error_history, "", "" finally: # 루프 정리 try: loop.close() except: pass # Gradio 인터페이스 with gr.Blocks( title="⚡ Speed-Optimized Multi-Agent System (Secure)", theme=gr.themes.Soft(), css=""" .gradio-container { max-width: 1400px !important; margin: auto !important; } """ ) as demo: gr.Markdown(""" # ⚡ Enhanced Multi-Agent RAG System (보안 강화 버전) **Complex questions processed within 5-8 seconds | Multi-language support | Model Info Protected** **Optimization Features:** - 🚀 Parallel Processing: Concurrent agent execution - ⚡ Stream Buffering: Network optimization - 🎯 Early Termination: Complete immediately when quality is met - 🌍 Multi-language: Auto-detect Korean/English/Japanese/Chinese - 🔒 **Security Enhanced**: AI 모델 정보 보호 활성화 - ❌ **Caching Disabled**: 캐싱 기능 제거됨 """) with gr.Row(): with gr.Column(scale=3): chatbot = gr.Chatbot( height=500, label="💬 Chat", type="messages" ) msg = gr.Textbox( label="Enter complex question", placeholder="Enter complex questions requiring analysis, strategy, or creative solutions...", lines=3 ) with gr.Row(): submit = gr.Button("⚡ High-Speed Process", variant="primary") clear = gr.Button("🔄 Reset") with gr.Accordion("🤖 Agent Processing", open=False): agent_thoughts = gr.Markdown() with gr.Accordion("📚 Search Sources", open=False): search_sources = gr.Markdown() with gr.Column(scale=1): gr.Markdown("**⚙️ Settings**") language_mode = gr.Radio( choices=["Auto", "Korean", "English", "Japanese", "Chinese"], value="Auto", label="🌍 Language Mode" ) use_search = gr.Checkbox( label="🔍 Use Web Search", value=True ) show_agent_thoughts = gr.Checkbox( label="🧠 Show Processing", value=True ) search_count = gr.Slider( minimum=3, maximum=10, value=5, step=1, label="Search Results Count" ) gr.Markdown(""" **⚡ Optimization Status** **Active Optimizations:** - ✅ Parallel Processing - ❌ ~~Smart Caching~~ (제거됨) - ✅ Buffer Streaming - ✅ Early Termination - ✅ Compressed Prompts - ✅ Multi-language Support - ✅ Error Recovery - 🔒 **Model Info Protection** **Security Features:** - 🔒 AI 모델명 숨김 - 🔒 버전 정보 보호 - 🔒 개발사 정보 차단 **Expected Processing Time:** - Simple Query: 3-5 seconds - Complex Query: 5-8 seconds - Very Complex: 8-12 seconds """) # 복잡한 질문 예제 (다국어) gr.Examples( examples=[ # Korean "AI 기술이 향후 10년간 한국 경제에 미칠 영향을 다각도로 분석하고 대응 전략을 제시해줘", "스타트업이 대기업과 경쟁하기 위한 혁신적인 전략을 단계별로 수립해줘", # English "Analyze the multifaceted impact of quantum computing on current encryption systems and propose alternatives", "Design 5 innovative business models for climate change mitigation with practical implementation details", # Japanese "メタバース時代の教育革新方案を実装可能なレベルで提案してください", # Chinese "分析人工智能对未来十年全球经济的影响并提出应对策略" ], inputs=msg ) # 이벤트 바인딩 submit.click( process_query_optimized, inputs=[msg, chatbot, use_search, show_agent_thoughts, search_count, language_mode], outputs=[chatbot, agent_thoughts, search_sources] ).then( lambda: "", None, msg ) msg.submit( process_query_optimized, inputs=[msg, chatbot, use_search, show_agent_thoughts, search_count, language_mode], outputs=[chatbot, agent_thoughts, search_sources] ).then( lambda: "", None, msg ) clear.click( lambda: ([], "", ""), None, [chatbot, agent_thoughts, search_sources] ) return demo # ============================================================================ # 메인 실행 # ============================================================================ if __name__ == "__main__": print(""" ╔══════════════════════════════════════════════════════════════╗ ║ ⚡ Speed-Optimized Multi-Agent System (Secure Version) ⚡ ║ ║ ║ ║ High-speed AI system with enhanced security features ║ ║ ║ ║ Features: ║ ║ • Multi-language support (KO/EN/JA/ZH) ║ ║ • Improved error recovery ║ ║ • NO CACHING (캐싱 기능 제거됨) ║ ║ • Adaptive stream buffering ║ ║ • Response cleaning & formatting ║ ║ • 🔒 MODEL INFO PROTECTION (모델 정보 보호) ║ ╚══════════════════════════════════════════════════════════════╝ """) # API 키 확인 if not os.getenv("FIREWORKS_API_KEY"): print("\n⚠️ FIREWORKS_API_KEY is not set.") if not os.getenv("BRAVE_SEARCH_API_KEY"): print("\n⚠️ BRAVE_SEARCH_API_KEY is not set.") # Gradio 앱 실행 demo = create_optimized_gradio_interface() is_hf_spaces = os.getenv("SPACE_ID") is not None if is_hf_spaces: print("\n🤗 Running in secure mode on Hugging Face Spaces...") demo.launch(server_name="0.0.0.0", server_port=7860) else: print("\n💻 Running in secure mode on local environment...") demo.launch(server_name="0.0.0.0", server_port=7860, share=False)