Spaces:
Sleeping
Sleeping
| # -*- coding: utf-8 -*- | |
| """ | |
| 高雄市統計智慧查詢應用程式 | |
| 版本:Hugging Face Spaces 適用版 | |
| 最後修改日期:2025-08-12 | |
| """ | |
| # ======================================================================= | |
| # 1. 匯入必要函式庫 | |
| # ======================================================================= | |
| import os | |
| import re | |
| import io | |
| import json | |
| import math | |
| import jieba | |
| import torch | |
| import msoffcrypto | |
| import gradio as gr | |
| import pandas as pd | |
| import google.generativeai as genai | |
| from typing import Type | |
| from typing import List, Dict | |
| from collections import defaultdict, OrderedDict | |
| # LangChain & SentenceTransformers | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from sentence_transformers import SentenceTransformer | |
| from langchain.retrievers import BM25Retriever | |
| # from langchain_community.retrievers import BM25Retriever | |
| from langchain.schema import Document | |
| # from langchain_core.schema import Document | |
| # from langchain_core.documents import Document | |
| from langchain_core.tools import tool | |
| # from langchain_core.tools import StructuredTool | |
| # ======================================================================= | |
| # 2. 初始設定與資料庫載入 | |
| # ======================================================================= | |
| # --- 全域變數與模型設定 --- | |
| EMBEDDING_MODEL_NAME = 'intfloat/multilingual-e5-base' | |
| DB_JB_PATH = "yearbook_contents_jb_db_base5" | |
| DB_SIM_PATH = "yearbook_contents_simple_db_base5" | |
| EXCEL_FILE_PATH = "合併檔案.xlsx" | |
| EXCEL_PASSWORD = os.getenv('open_key') | |
| _df_cache = None | |
| # --- Custom Embedding Class --- | |
| class CustomE5Embedding(HuggingFaceEmbeddings): | |
| def embed_documents(self, texts): | |
| texts = [f"passage: {t}" for t in texts] | |
| return super().embed_documents(texts) | |
| def embed_query(self, text): | |
| return super().embed_query(f"query: {text}") | |
| # --- 載入模型與向量資料庫 --- | |
| print("載入嵌入模型中...") | |
| embedding_model_st = SentenceTransformer(EMBEDDING_MODEL_NAME) | |
| embedding_model_lc = CustomE5Embedding(model_name=EMBEDDING_MODEL_NAME) | |
| print("載入向量資料庫中...") | |
| try: | |
| db_jb = FAISS.load_local(DB_JB_PATH, embedding_model_lc, allow_dangerous_deserialization=True) | |
| db_sim = FAISS.load_local(DB_SIM_PATH, embedding_model_lc, allow_dangerous_deserialization=True) | |
| print("✅ 向量資料庫載入成功。") | |
| except Exception as e: | |
| print(f"❌ 載入向量資料庫失敗,請確認檔案路徑是否正確: {e}") | |
| db_jb, db_sim = None, None | |
| # ======================================================================= | |
| # 3. 核心查詢與處理函式 | |
| # ======================================================================= | |
| def chinese_tokenizer(text: str) -> list[str]: | |
| return list(jieba.cut(text)) | |
| def extract_project_name_from_content(content: str) -> str: | |
| cleaned_content = re.sub(r"[\s\u3000]", "", content) | |
| match = re.search(r"項目[::]([^。]+)", cleaned_content) | |
| if match: | |
| raw_name = match.group(1) | |
| final_name = re.sub(r"^\d+", "", raw_name) | |
| return final_name.strip() | |
| return None | |
| def extract_project_names_from_rag_manual_mix(query: str, db_jb, db_sim, top_k: int = 4) -> List[str]: | |
| if not db_jb or not db_sim: | |
| return [] | |
| k_bm25 = math.ceil(top_k / 2) | |
| k_faiss = math.floor(top_k / 2) | |
| split_docs = list(db_jb.docstore._dict.values()) | |
| bm25 = BM25Retriever.from_documents(split_docs, tokenizer=chinese_tokenizer) | |
| bm25.k = 20 | |
| bm25_docs = bm25.get_relevant_documents(" ".join(jieba.cut(query))) | |
| bm25_names = [name for doc in bm25_docs if (name := extract_project_name_from_content(doc.page_content))] | |
| unique_bm25_names = list(OrderedDict.fromkeys(bm25_names))[:k_bm25] | |
| prefixed_query = f"query: {query}" | |
| vector_docs_with_scores = db_sim.similarity_search_with_score(prefixed_query, k=20) | |
| faiss_names = [name for doc, score in vector_docs_with_scores if (name := extract_project_name_from_content(doc.page_content))] | |
| unique_faiss_names = list(OrderedDict.fromkeys(faiss_names))[:k_faiss] | |
| combined_names = unique_bm25_names + unique_faiss_names | |
| return list(OrderedDict.fromkeys(combined_names))[:top_k] | |
| def load_data(file_path: str = EXCEL_FILE_PATH) -> pd.DataFrame: | |
| global _df_cache | |
| if _df_cache is not None: | |
| return _df_cache | |
| if not EXCEL_PASSWORD: | |
| print("❌ 錯誤:未提供 Excel 密碼。") | |
| return None | |
| try: | |
| print(f"解密並讀取 Excel 檔案中... ({file_path})") | |
| # 建立一個暫存的記憶體空間 | |
| decrypted_buffer = io.BytesIO() | |
| # 開啟加密檔案 | |
| with open(file_path, 'rb') as f: | |
| # 使用 msoffcrypto 進行解密 | |
| file = msoffcrypto.OfficeFile(f) | |
| file.load_key(password=EXCEL_PASSWORD) | |
| # 將解密後的內容寫入記憶體空間 | |
| file.decrypt(decrypted_buffer) | |
| # Pandas 從記憶體中讀取解密後的資料 | |
| _df_cache = pd.read_excel(decrypted_buffer) | |
| print("✅ Excel 資料載入成功。") | |
| return _df_cache | |
| except FileNotFoundError: | |
| print(f"❌ 錯誤:找不到檔案 {file_path}") | |
| return None | |
| except Exception as e: | |
| # 捕捉可能的錯誤,例如密碼錯誤 | |
| print(f"❌ 錯誤:無法讀取檔案,請檢查密碼是否正確或檔案是否損毀。") | |
| print(f"詳細錯誤訊息: {e}") | |
| return None | |
| def batch_find_relevant_tables(api_key: str, sub_queries: list[str], top_k: int = 1) -> dict: | |
| """ | |
| 為每個子問題獨立查找候選表,並將完整的配對結構交由 Gemini 判斷。 | |
| """ | |
| print("🧠 正在為每個子問題獨立查找其相對應候選表...") | |
| # --- Step 1: 為每個子問題獨立獲取候選表,並存入字典 --- | |
| query_to_candidates_map = {} | |
| for query in sub_queries: | |
| print(f" -> 正在處理: '{query}'") | |
| # 為每個子問題找回 20 個最相關的候選表 | |
| candidates_per_query = extract_project_names_from_rag_manual_mix(query, db_jb, db_sim, top_k=20) | |
| # test | |
| # print(candidates_per_query) | |
| if candidates_per_query: # 只加入有找到候選表的查詢 | |
| query_to_candidates_map[query] = candidates_per_query | |
| if not query_to_candidates_map: | |
| print("⚠️ RAG 步驟未找到任何候選資料表,終止批次匹配。") | |
| return {} | |
| print(f"✅ 已為 {len(query_to_candidates_map)} 個問題找到專屬候選表。") | |
| # --- Step 2: 動態建構一個新的、結構化的 Prompt --- | |
| # 建立任務描述文字 | |
| tasks_text_parts = [] | |
| for i, (query, candidates) in enumerate(query_to_candidates_map.items()): | |
| # 將候選表列表格式化 | |
| candidate_list_str = "\n".join(f" - {c}" for c in candidates) | |
| task_block = f""" | |
| --- | |
| [任務 {i+1}] | |
| 問題: "{query}" | |
| 此問題的候選資料表清單: | |
| {candidate_list_str} | |
| """ | |
| tasks_text_parts.append(task_block) | |
| tasks_text = "".join(tasks_text_parts) | |
| # 任務描述 | |
| batch_prompt = f""" | |
| 你是一個專業的數據庫助理。你的任務是從下方的「待處理的配對任務清單」中,根據每一個query,找出最相關的資料表。其餘捨棄。 | |
| 你必須依[輸出範例]回傳問題及表名,不要有任何多餘的文字、編號、引號或說明。 | |
| 你必須嚴格遵守以下規則: | |
| **[最優先規則:查詢總戶數與人口數]** | |
| * **條件**:當問題中包含「高雄市」或其下**任何一個「行政區」**的名稱(例如:左營區、楠梓區、杉林區等),並且詢問的是整體的**「人口數」、「總人口數」或「戶數」**時。 | |
| * **動作**:你 **必須** 將其對應到表名 `高雄市戶數、人口密度及性比例`。此規則的優先級最高。 | |
| **[一般規則]** | |
| * 對於不符合上述優先規則的其他問題,請根據問題的關鍵字(如:出生、死亡、結婚、離婚、遷入、遷出等)選擇最相關的資料表。 | |
| [待處理的配對任務清單]: | |
| {tasks_text} | |
| --- | |
| 請嚴格遵循以下 JSON 格式輸出,不要有任何多餘的文字或說明。 | |
| 輸出的 JSON 物件中,鍵(key)必須是原始問題,值(value)必須是您為該問題選擇的最佳表名。 | |
| [輸出範例]: | |
| {{ | |
| "113年三民區總人口數": "高雄市戶數、人口密度及性比例", | |
| "112年鼓山區出生人數": "高雄市嬰兒出生數" | |
| }} | |
| """.strip() | |
| # --- Step 3: 呼叫 Gemini 並解析結果 --- | |
| try: | |
| print("--- Structured Batch Prompt to Gemini ---") | |
| # print(repr(batch_prompt)) # 如需偵錯,可取消註解 | |
| print("---------------------------------------") | |
| response_text = reply(api_key, "", batch_prompt) | |
| parsed_json = extract_json(response_text) | |
| if isinstance(parsed_json, dict): | |
| print(f"✅ 結構化批次匹配表名成功: {parsed_json}") | |
| return parsed_json | |
| return {} | |
| except Exception as e: | |
| print(f"❌ 結構化批次匹配表名失敗: {e}") | |
| return {} | |
| def batch_parse_sub_queries_with_gemini(api_key: str, sub_queries: List[str]) -> Dict[str, Dict]: | |
| """ | |
| 批次解析所有子問題,提取時間、地區及查詢項目。 | |
| 回傳一個以子問題為鍵(key)的字典。 | |
| """ | |
| print(f"🤖 正在請求 Gemini 批次解析 {len(sub_queries)} 個子問題...") | |
| sub_queries_formatted = "\n".join([f"- {q}" for q in sub_queries]) | |
| prompt = f""" | |
| 你是一個高效率的數據查詢代理,請使用繁體中文回答。 | |
| 你的任務是分析使用者問題,並根據內容清晰度決定如何回應。 | |
| **情境一:問題清晰,可進行查詢** | |
| 如果問題中明確包含「時間」和「查詢項目」,請依照以下格式輸出: | |
| time_query: <時間文字> | |
| district_query: <地點文字> | |
| item_query: <地點文字>+<查詢項目文字> | |
| **重要規則:** | |
| 1. **時間正規化**:當使用者輸入的時間包含 "年底"、"年中"、"年初"、"年度" 等描述時,請將 `time_query` 正規化為年份。例如,"113年底" 應轉換為 "113年"。 | |
| 2. **時間校正**:當使用者輸入的時間包含 "學年"、"年底"、"年中"、"年初"、"年" 等描述時,如該問題是有關學校類型(大學、高中職、國中小、補習班等概況),請將 `time_query` 修正為學年。例如,"113年底" 應轉換為 "113學年"。 | |
| 3. `district_query` 為可選項目,若為空值則設為"高雄市全區"。如為"高雄市"或"高雄"等泛指整體者,亦設為"高雄市全區" | |
| 4. 請勿遺漏使用者輸入的任何關鍵詞。 | |
| **情境二:問題模糊,無法查詢** | |
| 如果問題中缺少「時間」或「查詢項目」任一資訊,導致無法進行查詢,請進行以下操作: | |
| 1. 直接回應以下文字,**並且不要生成 time_query 等欄位**。 | |
| `我不太了解你的意思,請重新定義問題(包含資料時間及統計指標)` | |
| --- | |
| 請嚴格遵循以上所有情境與規則,禁止加入多餘說明。 | |
| [子問題列表]: | |
| {sub_queries_formatted} | |
| --- | |
| [規則]: | |
| 1. **時間正規化**:當使用者輸入的時間包含 "年底"、"年中"、"年初"、"年度" 等描述時,請將 `time_query` 正規化為年份。例如,"113年底" 應轉換為 "113年"。 | |
| 2. **時間校正**:當使用者輸入的時間包含 "年底"、"年中"、"年初" 等描述時,如該問題是有關學校類型(國中小、補習班等概況),請將 `time_query` 修正為學年。例如,"113年" 應轉換為 "113學年"。 | |
| 3. `district_query` 為可選項目,若無則設為"高雄市全區"。如為"高雄市"或"高雄"等泛指整體者,亦設為"高雄市全區" | |
| 4. 請勿遺漏使用者輸入的任何關鍵詞。 | |
| --- | |
| [輸出格式]: | |
| 請嚴格遵循以下 JSON 格式輸出,不要有任何多餘的文字或說明。 | |
| 輸出的 JSON 物件中,鍵(key)必須是「子問題列表」中的原始問題,值(value)是一個包含解析參數的物件。 | |
| [輸出範例]: | |
| {{ | |
| "113年三民區總人口數": {{ | |
| "time_query": "113年", | |
| "district_query": "三民區", | |
| "item_query": "三民區總人口數" | |
| }}, | |
| "112年鼓山區出生人數": {{ | |
| "time_query": "112年", | |
| "district_query": "鼓山區", | |
| "item_query": "鼓山區出生人數" | |
| }} | |
| }} | |
| """.strip() | |
| try: | |
| response_text = reply(api_key, "", prompt) | |
| parsed_json = extract_json(response_text) | |
| if isinstance(parsed_json, dict): | |
| print("✅ 子問題批次解析成功。") | |
| return parsed_json | |
| return {} | |
| except Exception as e: | |
| print(f"❌ 批次解析子問題失敗: {e}") | |
| return {} | |
| # --- 動態查詢工具 --- | |
| def semantic_query_logic(time_query: str, item_query: str, project_name: str, district_query: str = "") -> str: | |
| """ | |
| 直接以已匹配好的表名、時間及統計指標查詢合併資料。 | |
| """ | |
| print(f"--- 執行查詢: 表名='{project_name}', 時間='{time_query}', 地區='{district_query}', 項目='{item_query}' ---") | |
| df = load_data() | |
| if df is None: return "[]" | |
| # 步驟 1: 先用精確的表名進行篩選,大幅縮小範圍 | |
| filtered_df = df[df['表名'] == project_name].copy() | |
| if filtered_df.empty: | |
| # 如果表名就找不到任何資料,直接返回 | |
| return "[]" | |
| # 步驟 2: 在已縮小的範圍內,進行時間和地區的篩選 | |
| conditions = [] | |
| if time_query: | |
| conditions.append(filtered_df['表側資訊'].astype(str).str.contains(time_query, na=False) | filtered_df['表首資訊'].astype(str).str.contains(time_query, na=False)) | |
| if district_query: | |
| conditions.append(filtered_df['表側資訊'].astype(str).str.contains(district_query, na=False) | filtered_df['表首資訊'].astype(str).str.contains(district_query, na=False)) | |
| if conditions: | |
| combined_condition = pd.concat([cond.rename(i) for i, cond in enumerate(conditions)], axis=1).all(axis=1) | |
| filtered_df = filtered_df[combined_condition] | |
| if filtered_df.empty: return "[]" | |
| # 關鍵安全閥 | |
| MAX_CANDIDATES = 1300 | |
| if len(filtered_df) > MAX_CANDIDATES: | |
| print(f"⚠️ 篩選結果超過{MAX_CANDIDATES}筆({len(filtered_df)}),僅取前{MAX_CANDIDATES}筆進行向量分析以節省資源。") | |
| filtered_df = filtered_df.head(MAX_CANDIDATES) | |
| # 步驟 3: 對最終篩選出的結果進行向量化與語意比對 | |
| print(f"向量化階段:對 {len(filtered_df)} 筆資料進行向量化...") | |
| combined_texts = (filtered_df['表名'].astype(str) + " " + filtered_df['表首資訊'].astype(str) + " " + filtered_df['表側資訊'].astype(str)).tolist() | |
| # 加上前綴 | |
| prefixed_passage_texts = [f"passage: {t}" for t in combined_texts] | |
| prefixed_query_text = f"query: {item_query}" | |
| item_query_embedding = embedding_model_st.encode(prefixed_query_text, convert_to_tensor=True) | |
| candidate_embeddings = embedding_model_st.encode( prefixed_passage_texts, convert_to_tensor=True) | |
| semantic_scores = torch.matmul(item_query_embedding, candidate_embeddings.T).tolist() | |
| print(" ✅ 向量化完成。") | |
| # 步驟 4: 根據語意分數排序並產生最終結果 | |
| results = [] | |
| for i, row in enumerate(filtered_df.itertuples(index=False)): | |
| results.append({**row._asdict(), "語意分數": round(semantic_scores[i], 4)}) | |
| results.sort(key=lambda x: x['語意分數'], reverse=True) | |
| FINAL_K = 25 | |
| top_results = results[:FINAL_K] | |
| print("--- semantic_query_logic 執行完畢 ---") | |
| return json.dumps(top_results, ensure_ascii=False) if top_results else "[]" | |
| # 2. 從上面的普通函式,明確地建立 LangChain 工具 | |
| from langchain.tools import StructuredTool | |
| semantic_query_tool = StructuredTool.from_function( | |
| func=semantic_query_logic, | |
| name="semantic_query_tool", | |
| description="直接使用向量語意模型進行檢索排序。" | |
| ) | |
| # ======================================================================= | |
| # 4. 主要執行流程 (Reflect) | |
| # ======================================================================= | |
| system_reviewer = """ | |
| 你是語意分析專家,請將使用者的複雜問題拆解成具體子問題,並判斷每個子問題的查詢類型。 | |
| ⚠️ 拆解前請先檢查以下條件: | |
| 1. 涉及高雄市以外或全國性資料,請直接回傳:「抱歉~我是高雄市查詢機器人,無法查詢高雄以外資料。」 | |
| 2. 未提及明確時間(如112年、113年3月),請直接回傳:「抱歉~請問查詢的資料時間。」 | |
| 📌 明確時間=出現「具體年份」、「年月」、「季」或「學年」。模糊詞(平均、近年、目前、歷年等)皆視為未指定。 | |
| --- | |
| ### 規則與指令 | |
| 👑 最高優先級規則:行政區關鍵詞擴展 | |
| - **條件**:當問題中明確出現 **"各行政區"**、**"所有行政區"** 或 **"全體行政區"** 時,此規則優先級最高。 | |
| - **動作**:你 **必須** 將該問題擴展為 38 個獨立的子問題,每個子問題對應一個高雄市的行政區。擴展的方法是:將原始問題中的關鍵詞(例如 "各行政區")精確替換為下方列表中的每一個行政區名稱。 | |
| - **[輸入範例]**:「113年各行政區總人口數」 | |
| - **[輸出範例]**: | |
| [ | |
| { "sub_query": "113年鹽埕區總人口數", "type": "direct" }, | |
| { "sub_query": "113年鼓山區總人口數", "type": "direct" }, | |
| { "sub_query": "113年左營區總人口數", "type": "direct" }, | |
| ... (依此類推,直到最後一個行政區) ..., | |
| { "sub_query": "113年那瑪夏區總人口數", "type": "direct" } | |
| ] | |
| - **行政區列表**:["鹽埕區", "鼓山區", "左營區", "楠梓區", "三民區", "新興區", "前金區", "苓雅區", "前鎮區", "旗津區", "小港區", "鳳山區", "林園區", "大寮區", "大樹區", "大社區", "仁武區", "鳥松區", "岡山區", "橋頭區", "燕巢區", "田寮區", "阿蓮區", "路竹區", "湖內區", "茄萣區", "永安區", "彌陀區", "梓官區", "旗山區", "美濃區", "六龜區", "甲仙區", "杉林區", "內門區", "茂林區", "桃源區", "那瑪夏區"] | |
| --- | |
| ### 其他規則 | |
| 📌 子問題拆解規則: | |
| - 對於不符合上述「最高優先級規則」的問題,依此規則處理。 | |
| - 每個子問題必須包含 1 個「地點」、1 個「時間」、1 個「指標」。 | |
| - 若同時包含多個時間、地區或指標,請拆成多筆(如:110-113年、1至3月 都要拆開)。 | |
| - 若內容有年齡區間如20-24歲,則不必拆分。 | |
| - ⛔ 禁止省略使用者輸入中的任何關鍵詞(例如:「人口數合計」的「合計」也不得省略)。 | |
| 📌 回傳格式(**僅限 JSON 陣列**,不得加上任何文字): | |
| [ | |
| { | |
| "sub_query": "子問題內容(不得遺漏任何原始資訊)", | |
| "type": "direct" 或 "comparison" | |
| # } | |
| # ] | |
| 📌 類型說明: | |
| - 可直接查詢者為 "direct"。 | |
| - 涉及比較、推論、排序者為 "comparison"。 | |
| --- | |
| 📤 輸出要求: | |
| - 嚴格遵循 JSON 陣列格式,禁止加上任何說明文字或```json ```標籤。 | |
| - 禁用「...」,所有子問題必須完整列出。 | |
| - 回應必須為繁體中文。 | |
| """ | |
| system_integration = """ | |
| 你是資深資料分析師,擅長回答高雄市統計問題。請依下列規則,根據使用者提問與查詢資料,產出清楚、正確的繁體中文答案。 | |
| ### 🎯 使用者問題: | |
| {user_query} | |
| ### 📊 查詢資料: | |
| {retrieved_chunks} | |
| --- | |
| ## 🧩 問題類型與處理方式: | |
| ### 一、比較型問題(如「最多」「變化」「排名」「哪區最高」): | |
| 1. **對應條件**:僅使用與問題一致的「時間、地區、指標」。 | |
| 2. **缺漏處理**:若資料不齊,請指出缺哪一項與無法比較的原因。 | |
| 3. **數值處理**:轉為千分位(例:23,000)、百分比與金額取至小數第 2 位。 | |
| 4. **條列推論**:逐項列出比較結果,明確指出最高、最低、差異。 | |
| 5. **禁止**:不得使用科學記號、英文、原欄位名稱;不得補資料或推論未查到的年份。 | |
| ### 二、一般整合型問題(如「113年底苓雅區人口?」): | |
| 1. **條件驗證**:若資料年份不同,請說明「您問的是 113 年,我找到的是 114 年…」。 | |
| 2. **缺資料處理**:無資料請說「資料缺乏,無法回答」;不可用其他時間資料代替。 | |
| 3. **作答格式**:300 字內、結論先行、條列清楚、千分位數字,不使用科學記號。開頭統一:「關於您提出的問題,綜合參考資料如下:」,結尾列出參考資料表名:「參考資料:高雄市原住民戶口數」。 | |
| --- | |
| ## 📌 共通禁止事項(適用所有問題): | |
| - ❌ 不得推論或補未查到的資料 | |
| - ❌ 不可引用不符問題條件的數據 | |
| - ❌ 不可貼欄位原文、英文、代碼、科學記號 | |
| --- | |
| ## ✅ 輸出格式: | |
| - 使用繁體中文 | |
| - **請使用 Markdown 格式化您的回覆,例如使用粗體標示重點數字、使用項目符號條列結果。** | |
| - 逐項列出的結果請由大到小排序。 | |
| - 數值一律轉為千分位 | |
| - 每份資料來源僅列一次 | |
| - 若缺資料請誠實說明並結束回答 | |
| 請根據以上規則,輸出準確答案。 | |
| """ | |
| def reply(api_key: str, system: str, prompt: str, model: str = "gemini-2.0-flash-lite"): | |
| """ | |
| 獲取 Gemini 回應。 | |
| """ | |
| try: | |
| genai.configure(api_key=api_key) | |
| if system and system.strip(): | |
| gemini_model = genai.GenerativeModel(model_name=model, system_instruction=system) | |
| else: | |
| # 如果 system 是空的,則不傳遞 system_instruction 參數 | |
| gemini_model = genai.GenerativeModel(model_name=model) | |
| response = gemini_model.generate_content(prompt, generation_config={'temperature': 0}) | |
| # 直接回傳完整的文字 | |
| return response.text | |
| except Exception as e: | |
| error_message = f"系統錯誤:呼叫 Gemini API 失敗。錯誤詳情: {e}" | |
| print(error_message) | |
| return error_message | |
| def extract_json(text: str) -> list | dict: | |
| json_block_match = re.search(r'```json\s*([\s\S]*?)\s*```|([\s\S]*)', text) | |
| if not json_block_match: raise ValueError("在回傳內容中找不到任何可解析的文字。") | |
| content = json_block_match.group(1) or json_block_match.group(2) | |
| start = content.find('[') if content.find('[') != -1 else content.find('{') | |
| end = content.rfind(']') if content.rfind(']') != -1 else content.rfind('}') | |
| if start == -1 or end == -1 or end < start: | |
| raise ValueError("在回傳內容中找不到有效的 JSON 結構。") | |
| json_text = content[start : end + 1] | |
| json_text = re.sub(r',\s*([\}\]])', r'\1', json_text) | |
| try: | |
| return json.loads(json_text) | |
| except json.JSONDecodeError as e: | |
| raise ValueError(f"清理後仍然無法解析 JSON。原始錯誤: {e}") | |
| # ======================================================================= | |
| # 5. Gradio Web UI | |
| # ======================================================================= | |
| def gradio_interface(user_input): | |
| """Gradio 主要處理函式""" | |
| api_key = os.getenv('Gemini') | |
| api_key2 = os.getenv('Gemini2') | |
| api_key3 = os.getenv('Gemini3') | |
| api_key4 = os.getenv('Gemini4') | |
| if not api_key: | |
| return "❌ 查詢失敗", "錯誤:未在伺服器環境中設定 'Gemini' API 金鑰。" | |
| # 檢查向量資料庫是否成功載入 | |
| if not db_jb or not db_sim: | |
| return "❌ 系統錯誤", "向量資料庫未成功載入,請檢查伺服器日誌。" | |
| # --- 開始逐步回饋 --- | |
| try: | |
| # Step 1:拆解子問題 | |
| yield "", "🔍 正在分析您的問題..." | |
| decomposed_text = reply(api_key, system_reviewer, user_input) | |
| if "抱歉~" in decomposed_text: | |
| yield "", decomposed_text | |
| return | |
| parsed_list = extract_json(decomposed_text) | |
| direct_queries = [item for item in parsed_list if item.get("type") == "direct"] | |
| if not direct_queries: | |
| yield "", "⚠️ 無法從輸入問題中擷取有效查詢項目。" | |
| return | |
| all_querys_summary = "🔹 關鍵查詢:\n" + "\n".join(f"- {q['sub_query']}" for q in direct_queries) | |
| yield all_querys_summary, "✅ 問題分析完成,正在匹配資料表..." | |
| # Step 2 & 3:批次匹配與解析 | |
| sub_query_texts = [q["sub_query"] for q in direct_queries] | |
| table_map = batch_find_relevant_tables(api_key2, sub_query_texts) | |
| if not table_map: | |
| yield all_querys_summary, "⚠️ 系統無法為您的查詢匹配到合適的資料表。" | |
| return | |
| yield all_querys_summary, "📚 資料表匹配成功,正在解析查詢參數..." | |
| params_map = batch_parse_sub_queries_with_gemini(api_key3, sub_query_texts) | |
| if not params_map: | |
| yield all_querys_summary, "⚠️ 系統無法解析您問題中的查詢參數。" | |
| return | |
| # Step 4:執行查詢 | |
| yield all_querys_summary, "⏳ 正在從資料庫中檢索資訊..." | |
| context_list = [] | |
| for q in direct_queries: | |
| sub_query = q["sub_query"] | |
| table_name = table_map.get(sub_query) | |
| params = params_map.get(sub_query) | |
| if not table_name or not params: | |
| context_list.append(f"【{sub_query}】\n查詢失敗:未能匹配到資料表或解析參數。") | |
| continue | |
| try: | |
| # 帶著所有精準參數執行查詢 | |
| result_json = semantic_query_logic( | |
| time_query=params.get("time_query", ""), | |
| item_query=params.get("item_query", ""), | |
| district_query=params.get("district_query", ""), | |
| project_name=table_name | |
| ) | |
| result_data = json.loads(result_json) | |
| if result_data: | |
| formatted_result = json.dumps(result_data, ensure_ascii=False, indent=2) | |
| context_list.append(f"【{sub_query}】\n查詢結果:\n{formatted_result}") | |
| else: | |
| context_list.append(f"【{sub_query}】\n查無資料。") | |
| except Exception as e: | |
| context_list.append(f"【{sub_query}】\n查詢失敗:{e}") | |
| combined_context = "\n\n".join(context_list) | |
| # Step 5:整合分析 | |
| yield all_querys_summary, "✍️ 查詢完成,正在生成最終回覆..." | |
| integration_prompt = f"使用者問題:{user_input}\n\n查詢資料如下:\n{combined_context}" | |
| integration_result = reply(api_key4, system_integration, integration_prompt) | |
| yield all_querys_summary, integration_result # 最終結果 | |
| except Exception as e: | |
| import traceback | |
| print(traceback.format_exc()) | |
| yield "❌ 查詢失敗", f"發生未預期的系統錯誤:{str(e)}" | |
| # --- UI 介面定義 --- | |
| # 1. 定義自訂 CSS 樣式 | |
| custom_css = """ | |
| /* --- 基本樣式 (淺色與夜間模式共用) --- */ | |
| #output_markdown { | |
| padding: 15px; | |
| border-radius: 8px; | |
| min-height: 150px; | |
| max-height: 400px; | |
| overflow-y: auto; | |
| box-shadow: inset 0 2px 4px 0 rgba(0,0,0,0.05); | |
| /* --- 預設為淺色模式的樣式 --- */ | |
| background: #ffffff !important; /* 淺色模式下的背景 */ | |
| color: #374151; /* 淺色模式下的文字顏色 (深灰色) */ | |
| } | |
| /* --- 夜間模式下的覆蓋樣式 --- */ | |
| /* 當系統或瀏覽器處於夜間模式時,以下規則會生效 */ | |
| @media (prefers-color-scheme: dark) { | |
| #output_markdown { | |
| background: #1f2937 !important; /* 夜間模式下的背景 (深藍灰色) */ | |
| color: #d1d5db !important; /* 夜間模式下的文字顏色 (淺灰色) */ | |
| } | |
| } | |
| """ | |
| # 2. 在 gr.Blocks 中載入 CSS | |
| with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue", secondary_hue="orange"), css=custom_css) as demo: | |
| gr.Markdown( | |
| """ | |
| # 🤖 高雄市公務統計資料智慧查詢 | |
| 歡迎使用!您可以透過自然語言提出關於高雄市的公務統計問題,系統將盡力為您查找相關資訊。 <br> | |
| 本系統運作於2v cpu & 16 GB RAM 免費資源,系統速度稍慢(產生結果時間依問題複雜度而定,一般為10至15秒)。 <br> | |
| **本智慧查詢資料範圍:104-113年高雄市;113年各行政區。** | |
| """ | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| user_input_box = gr.Textbox( | |
| label="請在此輸入您的問題", | |
| placeholder="例如:113年底前金區人口數?(小技巧:查不到資料時可轉換問法)", | |
| lines=5 | |
| ) | |
| gr.Examples( | |
| examples=[ | |
| "113年底前金區人口數?", | |
| "110-113年高雄市總人口數,及其趨勢?", | |
| "110-113年失業率情形", | |
| "113學年國小一年級學生人數", | |
| "113年底鹽埕區、三民區、前鎮區、林園區及美濃區人口數", | |
| "107年勞工教育班數", | |
| "112年路竹區門診體驗人數", | |
| "113學年國立大學教師數", | |
| "105年公共污水下水道當年接管戶數", | |
| "113年三民區人口數及該區身心障礙人數", | |
| "113年高雄市各行政區中哪一區人口最多" | |
| ], | |
| inputs=user_input_box, | |
| label="💡 範例問題" | |
| ) | |
| with gr.Row(): | |
| btn_clear = gr.ClearButton(value="清除") | |
| btn_submit = gr.Button("送出查詢", variant="primary") | |
| with gr.Column(scale=1): | |
| output_analysis = gr.Textbox( | |
| label="🌟 問題分析", | |
| interactive=False, | |
| lines=5, | |
| visible=False, | |
| ) | |
| with gr.Group(): | |
| gr.Markdown("### 🧐 查詢結果") | |
| output_result = gr.Markdown( | |
| # label="🧐 查詢結果", | |
| # show_label=True, | |
| elem_id="output_markdown" | |
| ) | |
| gr.Markdown( | |
| """ | |
| --- | |
| *資料來源:[高雄市政府主計處](https://kcgdg.kcg.gov.tw/StatWebRWD/Page/Default.aspx)* <br> | |
| *本工具由 AI 驅動,查詢結果僅供參考。* | |
| """ | |
| ) | |
| # --- 事件綁定 --- | |
| outputs_list = [output_analysis, output_result] | |
| btn_submit.click(fn=gradio_interface, inputs=user_input_box, outputs=outputs_list) | |
| user_input_box.submit(fn=gradio_interface, inputs=user_input_box, outputs=outputs_list) | |
| btn_clear.add([user_input_box] + outputs_list) | |
| # --- 啟動應用程式 --- | |
| if __name__ == "__main__": | |
| load_data() # 預先載入 Excel 資料到快取 | |
| demo.launch(debug=True) |