duna-chatbot-backend / backendv1.py
Király Zoltán
new
b5d1360
# backendv1.py
# VÉGLEGES, JAVÍTOTT VERZIÓ: Elastic Cloud és GitHub Secrets kompatibilis.
# A RAG rendszer motorja: adatfeldolgozás, keresés, generálás és tanulás.
# JAVÍTVA: A kategória-alapú szűrés ideiglenesen kikapcsolva a megbízhatóbb eredmények érdekében.
import os
import time
import datetime
import traceback
import re
from collections import defaultdict
from elasticsearch import Elasticsearch, exceptions as es_exceptions
import torch
from sentence_transformers import SentenceTransformer
from sentence_transformers.cross_encoder import CrossEncoder
from spellchecker import SpellChecker
from dotenv import load_dotenv
import sys
import nltk
from concurrent.futures import ThreadPoolExecutor
# Késleltetett importálás, hogy csak akkor legyen hiba, ha tényleg használjuk
try:
from together import Together
TOGETHER_AVAILABLE = True
except ImportError:
TOGETHER_AVAILABLE = False
# === ANSI Színkódok (konzol loggoláshoz) ===
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
RESET = '\033[0m'
BLUE = '\033[94m'
CYAN = '\033[96m'
MAGENTA = '\033[95m'
# --- Konfiguráció ---
# A hitelesítő adatok a környezeti változókból kerülnek beolvasásra.
CONFIG = {
"VECTOR_INDEX_NAMES": ["duna", "dunawebindexai"],
"FEEDBACK_INDEX_NAME": "feedback_index",
"ES_CLIENT_TIMEOUT": 90,
"EMBEDDING_MODEL_NAME": 'sentence-transformers/paraphrase-multilingual-mpnet-base-v2',
"CROSS_ENCODER_MODEL_NAME": 'cross-encoder/mmarco-mMiniLMv2-L12-H384-v1',
"TOGETHER_MODEL_NAME": "meta-llama/Llama-3.3-70B-Instruct-Turbo-Free",
"QUERY_EXPANSION_MODEL": "mistralai/Mixtral-8x7B-Instruct-v0.1",
"LLM_CLIENT_TIMEOUT": 120,
"NUM_CONTEXT_RESULTS": 5,
"RE_RANK_CANDIDATE_COUNT": 50,
"RRF_RANK_CONSTANT": 60,
"INITIAL_SEARCH_SIZE": 150,
"KNN_NUM_CANDIDATES": 200,
"MAX_GENERATION_TOKENS": 1024,
"GENERATION_TEMPERATURE": 0.6,
"USE_QUERY_EXPANSION": True,
"SPELLCHECK_LANG": 'hu',
"MAX_HISTORY_TURNS": 3
}
# --- Segédfüggvények ---
def correct_spellings(text, spell_checker_instance):
if not spell_checker_instance: return text
try:
words = re.findall(r'\b\w+\b', text.lower())
misspelled = spell_checker_instance.unknown(words)
if not misspelled: return text
corrected_text = text
for word in misspelled:
correction = spell_checker_instance.correction(word)
if correction and correction != word:
corrected_text = re.sub(r'\b' + re.escape(word) + r'\b', corrected_text, flags=re.IGNORECASE)
return corrected_text
except Exception as e:
print(f"{RED}Hiba a helyesírás javítása közben: {e}{RESET}")
return text
def get_query_category_with_llm(client, query):
if not client: return 'egyéb'
print(f" {CYAN}-> Lekérdezés kategorizálása LLM-mel...{RESET}")
category_list = ['IT biztonsági szolgáltatások', 'szolgáltatások', 'hardver', 'szoftver', 'hírek', 'audiovizuális konferenciatechnika']
categories_text = ", ".join([f"'{cat}'" for cat in category_list])
prompt = f"""Adott egy felhasználói kérdés. Adj meg egyetlen, rövid kategóriát a következő listából, ami a legjobban jellemzi a kérdést. A válaszodban csak a kategória szerepeljen, más szöveg nélkül.
Lehetséges kategóriák: {categories_text}
Kérdés: '{query}'
Kategória:"""
messages = [{"role": "user", "content": prompt}]
try:
response = client.chat.completions.create(model=CONFIG["QUERY_EXPANSION_MODEL"], messages=messages, temperature=0.1, max_tokens=30)
if response and response.choices:
category = response.choices[0].message.content.strip().replace("'", "").replace("`", "")
for cat in category_list:
if cat.lower() in category.lower():
print(f" {GREEN}-> A kérdés LLM által generált kategóriája: '{cat}'{RESET}")
return cat.lower()
return 'egyéb'
except Exception as e:
print(f"{RED}Hiba LLM kategorizáláskor: {e}{RESET}")
return 'egyéb'
def expand_or_rewrite_query(original_query, client):
final_queries = [original_query]
if not (CONFIG["USE_QUERY_EXPANSION"] and client):
return final_queries
print(f" {BLUE}-> Lekérdezés bővítése/átírása...{RESET}")
prompt = f"Adott egy magyar nyelvű felhasználói kérdés: '{original_query}'. Generálj 2 db alternatív, releváns keresőkifejezést. A válaszodban csak ezeket add vissza, vesszővel (,) elválasztva, minden más szöveg nélkül."
messages = [{"role": "user", "content": prompt}]
try:
response = client.chat.completions.create(model=CONFIG["QUERY_EXPANSION_MODEL"], messages=messages, temperature=0.5, max_tokens=100)
if response and response.choices:
generated_text = response.choices[0].message.content.strip()
alternatives = [q.strip().replace('"', '').replace("'", '').replace('.', '') for q in generated_text.split(',') if q.strip() and q.strip() != original_query]
final_queries.extend(alternatives)
print(f" {GREEN}-> Bővített lekérdezések: {final_queries}{RESET}")
except Exception as e:
print(f"{RED}Hiba a lekérdezés bővítése során: {e}{RESET}")
return final_queries
def run_separate_searches(es_client, query_text, embedding_model, expanded_queries, query_category=None):
results = {'knn': {}, 'keyword': {}}
es_client_with_timeout = es_client.options(request_timeout=CONFIG["ES_CLIENT_TIMEOUT"])
source_fields = ["text_content", "source_url", "summary", "category"]
filters = []
### JAVÍTÁS ###
# A kategória-alapú szűrés ideiglenesen ki van kapcsolva, mert pontatlan
# kategorizálás esetén drasztikusan rontja a találatok minőségét.
# A keresés így a teljes adatbázisban fut, ami megbízhatóbb.
#
# if query_category and query_category != 'egyéb':
# print(f" {MAGENTA}-> Kategória-alapú szűrés hozzáadása a kereséshez: '{query_category}'{RESET}")
# filters.append({"match": {"category": query_category}})
def knn_search(index, query_vector):
try:
knn_query = {"field": "embedding", "query_vector": query_vector, "k": CONFIG["INITIAL_SEARCH_SIZE"], "num_candidates": CONFIG["KNN_NUM_CANDIDATES"], "filter": filters}
response = es_client_with_timeout.search(index=index, knn=knn_query, _source=source_fields, size=CONFIG["INITIAL_SEARCH_SIZE"])
return index, response.get('hits', {}).get('hits', [])
except Exception as e:
print(f"{RED}Hiba kNN keresés során ({index}): {e}{RESET}")
return index, []
def keyword_search(index, expanded_queries):
try:
should_clauses = [{"match": {"text_content": {"query": q, "operator": "OR", "fuzziness": "AUTO"}}} for q in expanded_queries]
query_body = {"query": {"bool": {"should": should_clauses, "minimum_should_match": 1, "filter": filters}}}
response = es_client_with_timeout.search(index=index, query=query_body['query'], _source=source_fields, size=CONFIG["INITIAL_SEARCH_SIZE"])
return index, response.get('hits', {}).get('hits', [])
except Exception as e:
print(f"{RED}Hiba kulcsszavas keresés során ({index}): {e}{RESET}")
return index, []
query_vector = embedding_model.encode(query_text, normalize_embeddings=True).tolist() if embedding_model else None
with ThreadPoolExecutor(max_workers=len(CONFIG["VECTOR_INDEX_NAMES"]) * 2) as executor:
knn_futures = {executor.submit(knn_search, index, query_vector) for index in CONFIG["VECTOR_INDEX_NAMES"] if query_vector}
keyword_futures = {executor.submit(keyword_search, index, expanded_queries) for index in CONFIG["VECTOR_INDEX_NAMES"]}
for future in knn_futures:
index, hits = future.result()
results['knn'][index] = [(rank + 1, hit) for rank, hit in enumerate(hits)]
for future in keyword_futures:
index, hits = future.result()
results['keyword'][index] = [(rank + 1, hit) for rank, hit in enumerate(hits)]
total_knn_hits = sum(len(h) for h in results['knn'].values())
total_keyword_hits = sum(len(h) for h in results['keyword'].values())
print(f"{CYAN}Vektorkeresési találatok száma: {total_knn_hits}{RESET}")
print(f"{CYAN}Kulcsszavas keresési találatok száma: {total_keyword_hits}{RESET}")
return results
def merge_results_rrf(search_results):
rrf_scores = defaultdict(float)
all_hits_data = {}
for search_type in search_results:
for index_name in search_results[search_type]:
for rank, hit in search_results[search_type][index_name]:
doc_id = hit['_id']
rrf_scores[doc_id] += 1.0 / (CONFIG["RRF_RANK_CONSTANT"] + rank)
if doc_id not in all_hits_data:
all_hits_data[doc_id] = hit
combined_results = sorted([(doc_id, score, all_hits_data[doc_id]) for doc_id, score in rrf_scores.items()], key=lambda item: item[1], reverse=True)
print(f"{CYAN}RRF által rangsorolt Top 5 pontszám: {[f'{score:.4f}' for doc_id, score, hit in combined_results[:5]]}{RESET}")
return combined_results
def retrieve_context_reranked(backend, query_text, confidence_threshold, fallback_message, query_category):
expanded_queries = expand_or_rewrite_query(query_text, backend["llm_client"])
search_results = run_separate_searches(backend["es_client"], query_text, backend["embedding_model"], expanded_queries, query_category)
merged_results = merge_results_rrf(search_results)
if not merged_results:
return fallback_message, [], None
candidates_to_rerank = merged_results[:CONFIG["RE_RANK_CANDIDATE_COUNT"]]
hits_data_for_reranking = [hit for _, _, hit in candidates_to_rerank]
query_chunk_pairs = [[query_text, hit['_source'].get('summary', hit['_source'].get('text_content'))] for hit in hits_data_for_reranking if hit and '_source' in hit]
ranked_by_ce = []
if backend["cross_encoder"] and query_chunk_pairs:
ce_scores = backend["cross_encoder"].predict(query_chunk_pairs, show_progress_bar=False)
ranked_by_ce = sorted(zip(ce_scores, hits_data_for_reranking), key=lambda x: x[0], reverse=True)
print(f"{CYAN}Cross-Encoder pontszámok (Top 5):{RESET} {[f'{score:.4f}' for score, _ in ranked_by_ce[:5]]}")
if not ranked_by_ce:
return fallback_message, [], None
top_score = float(ranked_by_ce[0][0])
if top_score < confidence_threshold:
dynamic_fallback = (f"{fallback_message}\n\nA '{query_text}' kérdésre a legjobb találat megbízhatósági pontszáma ({top_score:.2f}) nem érte el a beállított küszöböt ({confidence_threshold:.2f}).")
return dynamic_fallback, [], top_score
final_hits_for_context = [hit for _, hit in ranked_by_ce[:CONFIG["NUM_CONTEXT_RESULTS"]]]
context_parts = [hit['_source'].get('summary', hit['_source'].get('text_content')) for hit in final_hits_for_context]
context_string = "\n\n---\n\n".join(context_parts)
sources = [{"url": hit['_source'].get('source_url', '?'), "content": hit['_source'].get('text_content', 'N/A')} for hit in final_hits_for_context]
return context_string, sources, top_score
def generate_answer_with_history(client, model_name, messages, temperature):
if not client: return "Hiba: Az AI kliens nincs inicializálva."
try:
response = client.chat.completions.create(model=model_name, messages=messages, temperature=temperature, max_tokens=CONFIG["MAX_GENERATION_TOKENS"], timeout=CONFIG["LLM_CLIENT_TIMEOUT"])
if response and response.choices:
return response.choices[0].message.content.strip()
return "Hiba: Nem érkezett érvényes válasz az AI modelltől."
except Exception as e:
print(f"{RED}Hiba a válasz generálásakor: {e}{RESET}")
return "Hiba történt az AI modell hívásakor."
def search_in_feedback_index(es_client, embedding_model, question, min_score=0.75):
try:
if not es_client.indices.exists(index=CONFIG["FEEDBACK_INDEX_NAME"]): return None, None
embedding = embedding_model.encode(question, normalize_embeddings=True).tolist()
knn_query = {"field": "embedding", "query_vector": embedding, "k": 1, "num_candidates": 10}
response = es_client.search(index=CONFIG["FEEDBACK_INDEX_NAME"], knn=knn_query, _source=["question_text", "correction_text"])
hits = response.get('hits', {}).get('hits', [])
if hits and hits[0]['_score'] >= min_score:
top_hit = hits[0]; source = top_hit['_source']; score = top_hit['_score']
if score > 0.98: return "direct_answer", source['correction_text']
instruction = f"Egy nagyon hasonló kérdésre ('{source['question_text']}') korábban a következő javítást/iránymutatást adtad: '{source['correction_text']}'. A válaszodat elsősorban ez alapján alkosd meg!"
return "instruction", instruction
except Exception:
return None, None
return None, None
def index_feedback(es_client, embedding_model, question, correction):
try:
embedding = embedding_model.encode(question, normalize_embeddings=True).tolist()
doc = {"question_text": question, "correction_text": correction, "embedding": embedding, "timestamp": datetime.datetime.now()}
es_client.index(index=CONFIG["FEEDBACK_INDEX_NAME"], document=doc)
return True
except Exception as e:
print(f"{RED}Hiba a visszajelzés indexelése során: {e}{RESET}")
return False
def get_all_feedback(es_client, index_name):
try:
if not es_client.indices.exists(index=index_name): return []
response = es_client.search(index=index_name, query={"match_all": {}}, size=1000, sort=[{"timestamp": {"order": "desc"}}])
return response.get('hits', {}).get('hits', [])
except Exception as e:
print(f"{RED}Hiba a visszajelzések listázása során: {e}{RESET}")
return []
def delete_feedback_by_id(es_client, index_name, doc_id):
try:
es_client.delete(index=index_name, id=doc_id)
return True
except Exception as e:
print(f"{RED}Hiba a visszajelzés törlése során (ID: {doc_id}): {e}{RESET}")
return False
def update_feedback_comment(es_client, index_name, doc_id, new_comment):
try:
es_client.update(index=index_name, id=doc_id, doc={"correction_text": new_comment})
return True
except Exception as e:
print(f"{RED}Hiba a visszajelzés szerkesztése során (ID: {doc_id}): {e}{RESET}")
return False
def initialize_backend():
print("----- Backend Motor Inicializálása -----")
load_dotenv()
es_cloud_id = os.getenv("ES_CLOUD_ID")
es_api_key = os.getenv("ES_API_KEY")
together_api_key = os.getenv("TOGETHER_API_KEY")
if not all([es_cloud_id, es_api_key, together_api_key]):
print(f"{RED}Hiba: Hiányzó környezeti változók! Szükséges: ES_CLOUD_ID, ES_API_KEY, TOGETHER_API_KEY{RESET}")
return None
if not TOGETHER_AVAILABLE:
print(f"{RED}Hiba: A 'together' csomag nincs telepítve.{RESET}")
return None
try:
nltk.data.find('tokenizers/punkt')
except LookupError:
nltk.download('punkt', quiet=True)
spell_checker = None
try:
spell_checker = SpellChecker(language=CONFIG["SPELLCHECK_LANG"])
custom_words = ["dunaelektronika", "kft", "outsourcing", "dell", "lenovo", "nis2", "szerver", "kliens", "hálózati", "hpe"]
spell_checker.word_frequency.load_words(custom_words)
except Exception as e:
print(f"{RED}Helyesírás-ellenőrző hiba: {e}{RESET}")
try:
print(f"{CYAN}Elasticsearch kliens inicializálása...{RESET}")
es_client = Elasticsearch(cloud_id=es_cloud_id, api_key=es_api_key, request_timeout=CONFIG["ES_CLIENT_TIMEOUT"])
if not es_client.ping(): raise ConnectionError("Elasticsearch ping sikertelen.")
print(f"{GREEN}Elasticsearch kliens kész.{RESET}")
print(f"{CYAN}AI modellek betöltése...{RESET}")
device = 'cuda' if torch.cuda.is_available() else 'cpu'
embedding_model = SentenceTransformer(CONFIG["EMBEDDING_MODEL_NAME"], device=device)
cross_encoder = CrossEncoder(CONFIG["CROSS_ENCODER_MODEL_NAME"], device=device)
llm_client = Together(api_key=together_api_key)
print(f"{GREEN}AI modellek betöltve (eszköz: {device}).{RESET}")
backend_objects = {
"es_client": es_client, "embedding_model": embedding_model, "cross_encoder": cross_encoder,
"llm_client": llm_client, "spell_checker": spell_checker
}
print(f"{GREEN}----- Backend Motor Készen Áll -----{RESET}")
return backend_objects
except Exception as e:
print(f"{RED}Hiba a backend inicializálása során: {e}{RESET}")
traceback.print_exc()
return None
def process_query(user_question, chat_history, backend, confidence_threshold, fallback_message):
print(f"\n{BLUE}----- Új lekérdezés feldolgozása ----{RESET}")
print(f"{BLUE}Kérdés: {user_question}{RESET}")
corrected_question = correct_spellings(user_question, backend["spell_checker"])
print(f"{BLUE}Javított kérdés: {corrected_question}{RESET}")
feedback_type, feedback_content = search_in_feedback_index(backend["es_client"], backend["embedding_model"], corrected_question)
if feedback_type == "direct_answer":
print(f"{GREEN}Direkt válasz a visszajelzési adatbázisból.{RESET}")
return {"answer": feedback_content, "sources": [{"url": "Személyes visszajelzés alapján", "content": "Ez egy korábban megadott, pontosított válasz."}], "corrected_question": corrected_question, "confidence_score": 10.0}
feedback_instructions = feedback_content if feedback_type == "instruction" else ""
query_category = get_query_category_with_llm(backend["llm_client"], corrected_question)
retrieved_context, sources, confidence_score = retrieve_context_reranked(backend, corrected_question, confidence_threshold, fallback_message, query_category)
if not sources and not feedback_instructions:
return {"answer": retrieved_context, "sources": [], "corrected_question": corrected_question, "confidence_score": confidence_score}
system_prompt = f"""Te egy professzionális, segítőkész AI asszisztens vagy.
A feladatod, hogy a KONTEXTUS-ból és a FEJLESZTŐI UTASÍTÁSOKBól származó információkat egyetlen, jól strukturált és ismétlés-mentes válasszá szintetizálld.
{feedback_instructions}
KRITIKUS SZABÁLY: Értékeld a kapott KONTEXTUS relevanciáját a felhasználó kérdéséhez képest. Ha egy kontextus-részlet nem kapcsolódik szorosan a kérdéshez, azt hagyd figyelmen kívül!
FIGYELEM: Szigorúan csak a megadott KONTEXTUS-ra és a fejlesztői utasításokra támaszkodj. Ha a releváns információk alapján nem tudsz válaszolni, add ezt a választ: '{fallback_message}'
KONTEXTUS:
---
{retrieved_context if sources else "A tudásbázisban nem található releváns információ."}
---
"""
messages_for_llm = chat_history[-(CONFIG["MAX_HISTORY_TURNS"] * 2):] if chat_history else []
messages_for_llm.extend([{"role": "system", "content": system_prompt}, {"role": "user", "content": corrected_question}])
answer = generate_answer_with_history(backend["llm_client"], CONFIG["TOGETHER_MODEL_NAME"], messages_for_llm, CONFIG["GENERATION_TEMPERATURE"])
return {"answer": answer, "sources": sources, "corrected_question": corrected_question, "confidence_score": confidence_score}