Spaces:
Running
Running
| # backendv1.py | |
| # VÉGLEGES, JAVÍTOTT VERZIÓ: Elastic Cloud és GitHub Secrets kompatibilis. | |
| # A RAG rendszer motorja: adatfeldolgozás, keresés, generálás és tanulás. | |
| # JAVÍTVA: A kategória-alapú szűrés ideiglenesen kikapcsolva a megbízhatóbb eredmények érdekében. | |
| import os | |
| import time | |
| import datetime | |
| import traceback | |
| import re | |
| from collections import defaultdict | |
| from elasticsearch import Elasticsearch, exceptions as es_exceptions | |
| import torch | |
| from sentence_transformers import SentenceTransformer | |
| from sentence_transformers.cross_encoder import CrossEncoder | |
| from spellchecker import SpellChecker | |
| from dotenv import load_dotenv | |
| import sys | |
| import nltk | |
| from concurrent.futures import ThreadPoolExecutor | |
| # Késleltetett importálás, hogy csak akkor legyen hiba, ha tényleg használjuk | |
| try: | |
| from together import Together | |
| TOGETHER_AVAILABLE = True | |
| except ImportError: | |
| TOGETHER_AVAILABLE = False | |
| # === ANSI Színkódok (konzol loggoláshoz) === | |
| GREEN = '\033[92m' | |
| YELLOW = '\033[93m' | |
| RED = '\033[91m' | |
| RESET = '\033[0m' | |
| BLUE = '\033[94m' | |
| CYAN = '\033[96m' | |
| MAGENTA = '\033[95m' | |
| # --- Konfiguráció --- | |
| # A hitelesítő adatok a környezeti változókból kerülnek beolvasásra. | |
| CONFIG = { | |
| "VECTOR_INDEX_NAMES": ["duna", "dunawebindexai"], | |
| "FEEDBACK_INDEX_NAME": "feedback_index", | |
| "ES_CLIENT_TIMEOUT": 90, | |
| "EMBEDDING_MODEL_NAME": 'sentence-transformers/paraphrase-multilingual-mpnet-base-v2', | |
| "CROSS_ENCODER_MODEL_NAME": 'cross-encoder/mmarco-mMiniLMv2-L12-H384-v1', | |
| "TOGETHER_MODEL_NAME": "meta-llama/Llama-3.3-70B-Instruct-Turbo-Free", | |
| "QUERY_EXPANSION_MODEL": "mistralai/Mixtral-8x7B-Instruct-v0.1", | |
| "LLM_CLIENT_TIMEOUT": 120, | |
| "NUM_CONTEXT_RESULTS": 5, | |
| "RE_RANK_CANDIDATE_COUNT": 50, | |
| "RRF_RANK_CONSTANT": 60, | |
| "INITIAL_SEARCH_SIZE": 150, | |
| "KNN_NUM_CANDIDATES": 200, | |
| "MAX_GENERATION_TOKENS": 1024, | |
| "GENERATION_TEMPERATURE": 0.6, | |
| "USE_QUERY_EXPANSION": True, | |
| "SPELLCHECK_LANG": 'hu', | |
| "MAX_HISTORY_TURNS": 3 | |
| } | |
| # --- Segédfüggvények --- | |
| def correct_spellings(text, spell_checker_instance): | |
| if not spell_checker_instance: return text | |
| try: | |
| words = re.findall(r'\b\w+\b', text.lower()) | |
| misspelled = spell_checker_instance.unknown(words) | |
| if not misspelled: return text | |
| corrected_text = text | |
| for word in misspelled: | |
| correction = spell_checker_instance.correction(word) | |
| if correction and correction != word: | |
| corrected_text = re.sub(r'\b' + re.escape(word) + r'\b', corrected_text, flags=re.IGNORECASE) | |
| return corrected_text | |
| except Exception as e: | |
| print(f"{RED}Hiba a helyesírás javítása közben: {e}{RESET}") | |
| return text | |
| def get_query_category_with_llm(client, query): | |
| if not client: return 'egyéb' | |
| print(f" {CYAN}-> Lekérdezés kategorizálása LLM-mel...{RESET}") | |
| category_list = ['IT biztonsági szolgáltatások', 'szolgáltatások', 'hardver', 'szoftver', 'hírek', 'audiovizuális konferenciatechnika'] | |
| categories_text = ", ".join([f"'{cat}'" for cat in category_list]) | |
| prompt = f"""Adott egy felhasználói kérdés. Adj meg egyetlen, rövid kategóriát a következő listából, ami a legjobban jellemzi a kérdést. A válaszodban csak a kategória szerepeljen, más szöveg nélkül. | |
| Lehetséges kategóriák: {categories_text} | |
| Kérdés: '{query}' | |
| Kategória:""" | |
| messages = [{"role": "user", "content": prompt}] | |
| try: | |
| response = client.chat.completions.create(model=CONFIG["QUERY_EXPANSION_MODEL"], messages=messages, temperature=0.1, max_tokens=30) | |
| if response and response.choices: | |
| category = response.choices[0].message.content.strip().replace("'", "").replace("`", "") | |
| for cat in category_list: | |
| if cat.lower() in category.lower(): | |
| print(f" {GREEN}-> A kérdés LLM által generált kategóriája: '{cat}'{RESET}") | |
| return cat.lower() | |
| return 'egyéb' | |
| except Exception as e: | |
| print(f"{RED}Hiba LLM kategorizáláskor: {e}{RESET}") | |
| return 'egyéb' | |
| def expand_or_rewrite_query(original_query, client): | |
| final_queries = [original_query] | |
| if not (CONFIG["USE_QUERY_EXPANSION"] and client): | |
| return final_queries | |
| print(f" {BLUE}-> Lekérdezés bővítése/átírása...{RESET}") | |
| prompt = f"Adott egy magyar nyelvű felhasználói kérdés: '{original_query}'. Generálj 2 db alternatív, releváns keresőkifejezést. A válaszodban csak ezeket add vissza, vesszővel (,) elválasztva, minden más szöveg nélkül." | |
| messages = [{"role": "user", "content": prompt}] | |
| try: | |
| response = client.chat.completions.create(model=CONFIG["QUERY_EXPANSION_MODEL"], messages=messages, temperature=0.5, max_tokens=100) | |
| if response and response.choices: | |
| generated_text = response.choices[0].message.content.strip() | |
| alternatives = [q.strip().replace('"', '').replace("'", '').replace('.', '') for q in generated_text.split(',') if q.strip() and q.strip() != original_query] | |
| final_queries.extend(alternatives) | |
| print(f" {GREEN}-> Bővített lekérdezések: {final_queries}{RESET}") | |
| except Exception as e: | |
| print(f"{RED}Hiba a lekérdezés bővítése során: {e}{RESET}") | |
| return final_queries | |
| def run_separate_searches(es_client, query_text, embedding_model, expanded_queries, query_category=None): | |
| results = {'knn': {}, 'keyword': {}} | |
| es_client_with_timeout = es_client.options(request_timeout=CONFIG["ES_CLIENT_TIMEOUT"]) | |
| source_fields = ["text_content", "source_url", "summary", "category"] | |
| filters = [] | |
| ### JAVÍTÁS ### | |
| # A kategória-alapú szűrés ideiglenesen ki van kapcsolva, mert pontatlan | |
| # kategorizálás esetén drasztikusan rontja a találatok minőségét. | |
| # A keresés így a teljes adatbázisban fut, ami megbízhatóbb. | |
| # | |
| # if query_category and query_category != 'egyéb': | |
| # print(f" {MAGENTA}-> Kategória-alapú szűrés hozzáadása a kereséshez: '{query_category}'{RESET}") | |
| # filters.append({"match": {"category": query_category}}) | |
| def knn_search(index, query_vector): | |
| try: | |
| knn_query = {"field": "embedding", "query_vector": query_vector, "k": CONFIG["INITIAL_SEARCH_SIZE"], "num_candidates": CONFIG["KNN_NUM_CANDIDATES"], "filter": filters} | |
| response = es_client_with_timeout.search(index=index, knn=knn_query, _source=source_fields, size=CONFIG["INITIAL_SEARCH_SIZE"]) | |
| return index, response.get('hits', {}).get('hits', []) | |
| except Exception as e: | |
| print(f"{RED}Hiba kNN keresés során ({index}): {e}{RESET}") | |
| return index, [] | |
| def keyword_search(index, expanded_queries): | |
| try: | |
| should_clauses = [{"match": {"text_content": {"query": q, "operator": "OR", "fuzziness": "AUTO"}}} for q in expanded_queries] | |
| query_body = {"query": {"bool": {"should": should_clauses, "minimum_should_match": 1, "filter": filters}}} | |
| response = es_client_with_timeout.search(index=index, query=query_body['query'], _source=source_fields, size=CONFIG["INITIAL_SEARCH_SIZE"]) | |
| return index, response.get('hits', {}).get('hits', []) | |
| except Exception as e: | |
| print(f"{RED}Hiba kulcsszavas keresés során ({index}): {e}{RESET}") | |
| return index, [] | |
| query_vector = embedding_model.encode(query_text, normalize_embeddings=True).tolist() if embedding_model else None | |
| with ThreadPoolExecutor(max_workers=len(CONFIG["VECTOR_INDEX_NAMES"]) * 2) as executor: | |
| knn_futures = {executor.submit(knn_search, index, query_vector) for index in CONFIG["VECTOR_INDEX_NAMES"] if query_vector} | |
| keyword_futures = {executor.submit(keyword_search, index, expanded_queries) for index in CONFIG["VECTOR_INDEX_NAMES"]} | |
| for future in knn_futures: | |
| index, hits = future.result() | |
| results['knn'][index] = [(rank + 1, hit) for rank, hit in enumerate(hits)] | |
| for future in keyword_futures: | |
| index, hits = future.result() | |
| results['keyword'][index] = [(rank + 1, hit) for rank, hit in enumerate(hits)] | |
| total_knn_hits = sum(len(h) for h in results['knn'].values()) | |
| total_keyword_hits = sum(len(h) for h in results['keyword'].values()) | |
| print(f"{CYAN}Vektorkeresési találatok száma: {total_knn_hits}{RESET}") | |
| print(f"{CYAN}Kulcsszavas keresési találatok száma: {total_keyword_hits}{RESET}") | |
| return results | |
| def merge_results_rrf(search_results): | |
| rrf_scores = defaultdict(float) | |
| all_hits_data = {} | |
| for search_type in search_results: | |
| for index_name in search_results[search_type]: | |
| for rank, hit in search_results[search_type][index_name]: | |
| doc_id = hit['_id'] | |
| rrf_scores[doc_id] += 1.0 / (CONFIG["RRF_RANK_CONSTANT"] + rank) | |
| if doc_id not in all_hits_data: | |
| all_hits_data[doc_id] = hit | |
| combined_results = sorted([(doc_id, score, all_hits_data[doc_id]) for doc_id, score in rrf_scores.items()], key=lambda item: item[1], reverse=True) | |
| print(f"{CYAN}RRF által rangsorolt Top 5 pontszám: {[f'{score:.4f}' for doc_id, score, hit in combined_results[:5]]}{RESET}") | |
| return combined_results | |
| def retrieve_context_reranked(backend, query_text, confidence_threshold, fallback_message, query_category): | |
| expanded_queries = expand_or_rewrite_query(query_text, backend["llm_client"]) | |
| search_results = run_separate_searches(backend["es_client"], query_text, backend["embedding_model"], expanded_queries, query_category) | |
| merged_results = merge_results_rrf(search_results) | |
| if not merged_results: | |
| return fallback_message, [], None | |
| candidates_to_rerank = merged_results[:CONFIG["RE_RANK_CANDIDATE_COUNT"]] | |
| hits_data_for_reranking = [hit for _, _, hit in candidates_to_rerank] | |
| query_chunk_pairs = [[query_text, hit['_source'].get('summary', hit['_source'].get('text_content'))] for hit in hits_data_for_reranking if hit and '_source' in hit] | |
| ranked_by_ce = [] | |
| if backend["cross_encoder"] and query_chunk_pairs: | |
| ce_scores = backend["cross_encoder"].predict(query_chunk_pairs, show_progress_bar=False) | |
| ranked_by_ce = sorted(zip(ce_scores, hits_data_for_reranking), key=lambda x: x[0], reverse=True) | |
| print(f"{CYAN}Cross-Encoder pontszámok (Top 5):{RESET} {[f'{score:.4f}' for score, _ in ranked_by_ce[:5]]}") | |
| if not ranked_by_ce: | |
| return fallback_message, [], None | |
| top_score = float(ranked_by_ce[0][0]) | |
| if top_score < confidence_threshold: | |
| dynamic_fallback = (f"{fallback_message}\n\nA '{query_text}' kérdésre a legjobb találat megbízhatósági pontszáma ({top_score:.2f}) nem érte el a beállított küszöböt ({confidence_threshold:.2f}).") | |
| return dynamic_fallback, [], top_score | |
| final_hits_for_context = [hit for _, hit in ranked_by_ce[:CONFIG["NUM_CONTEXT_RESULTS"]]] | |
| context_parts = [hit['_source'].get('summary', hit['_source'].get('text_content')) for hit in final_hits_for_context] | |
| context_string = "\n\n---\n\n".join(context_parts) | |
| sources = [{"url": hit['_source'].get('source_url', '?'), "content": hit['_source'].get('text_content', 'N/A')} for hit in final_hits_for_context] | |
| return context_string, sources, top_score | |
| def generate_answer_with_history(client, model_name, messages, temperature): | |
| if not client: return "Hiba: Az AI kliens nincs inicializálva." | |
| try: | |
| response = client.chat.completions.create(model=model_name, messages=messages, temperature=temperature, max_tokens=CONFIG["MAX_GENERATION_TOKENS"], timeout=CONFIG["LLM_CLIENT_TIMEOUT"]) | |
| if response and response.choices: | |
| return response.choices[0].message.content.strip() | |
| return "Hiba: Nem érkezett érvényes válasz az AI modelltől." | |
| except Exception as e: | |
| print(f"{RED}Hiba a válasz generálásakor: {e}{RESET}") | |
| return "Hiba történt az AI modell hívásakor." | |
| def search_in_feedback_index(es_client, embedding_model, question, min_score=0.75): | |
| try: | |
| if not es_client.indices.exists(index=CONFIG["FEEDBACK_INDEX_NAME"]): return None, None | |
| embedding = embedding_model.encode(question, normalize_embeddings=True).tolist() | |
| knn_query = {"field": "embedding", "query_vector": embedding, "k": 1, "num_candidates": 10} | |
| response = es_client.search(index=CONFIG["FEEDBACK_INDEX_NAME"], knn=knn_query, _source=["question_text", "correction_text"]) | |
| hits = response.get('hits', {}).get('hits', []) | |
| if hits and hits[0]['_score'] >= min_score: | |
| top_hit = hits[0]; source = top_hit['_source']; score = top_hit['_score'] | |
| if score > 0.98: return "direct_answer", source['correction_text'] | |
| instruction = f"Egy nagyon hasonló kérdésre ('{source['question_text']}') korábban a következő javítást/iránymutatást adtad: '{source['correction_text']}'. A válaszodat elsősorban ez alapján alkosd meg!" | |
| return "instruction", instruction | |
| except Exception: | |
| return None, None | |
| return None, None | |
| def index_feedback(es_client, embedding_model, question, correction): | |
| try: | |
| embedding = embedding_model.encode(question, normalize_embeddings=True).tolist() | |
| doc = {"question_text": question, "correction_text": correction, "embedding": embedding, "timestamp": datetime.datetime.now()} | |
| es_client.index(index=CONFIG["FEEDBACK_INDEX_NAME"], document=doc) | |
| return True | |
| except Exception as e: | |
| print(f"{RED}Hiba a visszajelzés indexelése során: {e}{RESET}") | |
| return False | |
| def get_all_feedback(es_client, index_name): | |
| try: | |
| if not es_client.indices.exists(index=index_name): return [] | |
| response = es_client.search(index=index_name, query={"match_all": {}}, size=1000, sort=[{"timestamp": {"order": "desc"}}]) | |
| return response.get('hits', {}).get('hits', []) | |
| except Exception as e: | |
| print(f"{RED}Hiba a visszajelzések listázása során: {e}{RESET}") | |
| return [] | |
| def delete_feedback_by_id(es_client, index_name, doc_id): | |
| try: | |
| es_client.delete(index=index_name, id=doc_id) | |
| return True | |
| except Exception as e: | |
| print(f"{RED}Hiba a visszajelzés törlése során (ID: {doc_id}): {e}{RESET}") | |
| return False | |
| def update_feedback_comment(es_client, index_name, doc_id, new_comment): | |
| try: | |
| es_client.update(index=index_name, id=doc_id, doc={"correction_text": new_comment}) | |
| return True | |
| except Exception as e: | |
| print(f"{RED}Hiba a visszajelzés szerkesztése során (ID: {doc_id}): {e}{RESET}") | |
| return False | |
| def initialize_backend(): | |
| print("----- Backend Motor Inicializálása -----") | |
| load_dotenv() | |
| es_cloud_id = os.getenv("ES_CLOUD_ID") | |
| es_api_key = os.getenv("ES_API_KEY") | |
| together_api_key = os.getenv("TOGETHER_API_KEY") | |
| if not all([es_cloud_id, es_api_key, together_api_key]): | |
| print(f"{RED}Hiba: Hiányzó környezeti változók! Szükséges: ES_CLOUD_ID, ES_API_KEY, TOGETHER_API_KEY{RESET}") | |
| return None | |
| if not TOGETHER_AVAILABLE: | |
| print(f"{RED}Hiba: A 'together' csomag nincs telepítve.{RESET}") | |
| return None | |
| try: | |
| nltk.data.find('tokenizers/punkt') | |
| except LookupError: | |
| nltk.download('punkt', quiet=True) | |
| spell_checker = None | |
| try: | |
| spell_checker = SpellChecker(language=CONFIG["SPELLCHECK_LANG"]) | |
| custom_words = ["dunaelektronika", "kft", "outsourcing", "dell", "lenovo", "nis2", "szerver", "kliens", "hálózati", "hpe"] | |
| spell_checker.word_frequency.load_words(custom_words) | |
| except Exception as e: | |
| print(f"{RED}Helyesírás-ellenőrző hiba: {e}{RESET}") | |
| try: | |
| print(f"{CYAN}Elasticsearch kliens inicializálása...{RESET}") | |
| es_client = Elasticsearch(cloud_id=es_cloud_id, api_key=es_api_key, request_timeout=CONFIG["ES_CLIENT_TIMEOUT"]) | |
| if not es_client.ping(): raise ConnectionError("Elasticsearch ping sikertelen.") | |
| print(f"{GREEN}Elasticsearch kliens kész.{RESET}") | |
| print(f"{CYAN}AI modellek betöltése...{RESET}") | |
| device = 'cuda' if torch.cuda.is_available() else 'cpu' | |
| embedding_model = SentenceTransformer(CONFIG["EMBEDDING_MODEL_NAME"], device=device) | |
| cross_encoder = CrossEncoder(CONFIG["CROSS_ENCODER_MODEL_NAME"], device=device) | |
| llm_client = Together(api_key=together_api_key) | |
| print(f"{GREEN}AI modellek betöltve (eszköz: {device}).{RESET}") | |
| backend_objects = { | |
| "es_client": es_client, "embedding_model": embedding_model, "cross_encoder": cross_encoder, | |
| "llm_client": llm_client, "spell_checker": spell_checker | |
| } | |
| print(f"{GREEN}----- Backend Motor Készen Áll -----{RESET}") | |
| return backend_objects | |
| except Exception as e: | |
| print(f"{RED}Hiba a backend inicializálása során: {e}{RESET}") | |
| traceback.print_exc() | |
| return None | |
| def process_query(user_question, chat_history, backend, confidence_threshold, fallback_message): | |
| print(f"\n{BLUE}----- Új lekérdezés feldolgozása ----{RESET}") | |
| print(f"{BLUE}Kérdés: {user_question}{RESET}") | |
| corrected_question = correct_spellings(user_question, backend["spell_checker"]) | |
| print(f"{BLUE}Javított kérdés: {corrected_question}{RESET}") | |
| feedback_type, feedback_content = search_in_feedback_index(backend["es_client"], backend["embedding_model"], corrected_question) | |
| if feedback_type == "direct_answer": | |
| print(f"{GREEN}Direkt válasz a visszajelzési adatbázisból.{RESET}") | |
| return {"answer": feedback_content, "sources": [{"url": "Személyes visszajelzés alapján", "content": "Ez egy korábban megadott, pontosított válasz."}], "corrected_question": corrected_question, "confidence_score": 10.0} | |
| feedback_instructions = feedback_content if feedback_type == "instruction" else "" | |
| query_category = get_query_category_with_llm(backend["llm_client"], corrected_question) | |
| retrieved_context, sources, confidence_score = retrieve_context_reranked(backend, corrected_question, confidence_threshold, fallback_message, query_category) | |
| if not sources and not feedback_instructions: | |
| return {"answer": retrieved_context, "sources": [], "corrected_question": corrected_question, "confidence_score": confidence_score} | |
| system_prompt = f"""Te egy professzionális, segítőkész AI asszisztens vagy. | |
| A feladatod, hogy a KONTEXTUS-ból és a FEJLESZTŐI UTASÍTÁSOKBól származó információkat egyetlen, jól strukturált és ismétlés-mentes válasszá szintetizálld. | |
| {feedback_instructions} | |
| KRITIKUS SZABÁLY: Értékeld a kapott KONTEXTUS relevanciáját a felhasználó kérdéséhez képest. Ha egy kontextus-részlet nem kapcsolódik szorosan a kérdéshez, azt hagyd figyelmen kívül! | |
| FIGYELEM: Szigorúan csak a megadott KONTEXTUS-ra és a fejlesztői utasításokra támaszkodj. Ha a releváns információk alapján nem tudsz válaszolni, add ezt a választ: '{fallback_message}' | |
| KONTEXTUS: | |
| --- | |
| {retrieved_context if sources else "A tudásbázisban nem található releváns információ."} | |
| --- | |
| """ | |
| messages_for_llm = chat_history[-(CONFIG["MAX_HISTORY_TURNS"] * 2):] if chat_history else [] | |
| messages_for_llm.extend([{"role": "system", "content": system_prompt}, {"role": "user", "content": corrected_question}]) | |
| answer = generate_answer_with_history(backend["llm_client"], CONFIG["TOGETHER_MODEL_NAME"], messages_for_llm, CONFIG["GENERATION_TEMPERATURE"]) | |
| return {"answer": answer, "sources": sources, "corrected_question": corrected_question, "confidence_score": confidence_score} |