duna-chatbot-backend / backendv1.py
darkisz
Add files via upload
92f70a2 unverified
raw
history blame
25.3 kB
# backendv1.py
# A RAG rendszer motorja: adatfeldolgozás, keresés, generálás és tanulás.
# Végleges, refaktorált verzió. Gyors, egylépcsős generálással.
import os
import time
import datetime
import json
import re
from collections import defaultdict
from together import Together
from elasticsearch import Elasticsearch, exceptions as es_exceptions
import torch
from sentence_transformers import SentenceTransformer
from sentence_transformers.cross_encoder import CrossEncoder
from spellchecker import SpellChecker
import warnings
from dotenv import load_dotenv
import sys
import nltk
from concurrent.futures import ThreadPoolExecutor
# === ANSI Színkódok (konzol loggoláshoz) ===
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
RESET = '\033[0m'
BLUE = '\033[94m'
CYAN = '\033[96m'
MAGENTA = '\033[95m'
# --- Konfiguráció ---
CONFIG = {
"ELASTIC_PASSWORD": os.environ.get("ES_PASSWORD", "T8xEbqQ4GAPkr73s2knN"),
"ELASTIC_HOST": "https://localhost:9200",
"VECTOR_INDEX_NAMES": ["duna", "dunawebindexai"],
"FEEDBACK_INDEX_NAME": "feedback_index",
"ES_CLIENT_TIMEOUT": 90,
"EMBEDDING_MODEL_NAME": 'sentence-transformers/paraphrase-multilingual-mpnet-base-v2',
"CROSS_ENCODER_MODEL_NAME": 'cross-encoder/mmarco-mMiniLMv2-L12-H384-v1',
"TOGETHER_MODEL_NAME": "meta-llama/Llama-3.3-70B-Instruct-Turbo-Free",
"QUERY_EXPANSION_MODEL": "mistralai/Mixtral-8x7B-Instruct-v0.1",
"LLM_CLIENT_TIMEOUT": 120,
"NUM_CONTEXT_RESULTS": 5,
"RE_RANK_CANDIDATE_COUNT": 50,
"RRF_RANK_CONSTANT": 60,
"INITIAL_SEARCH_SIZE": 150,
"KNN_NUM_CANDIDATES": 200,
"MAX_GENERATION_TOKENS": 1024,
"GENERATION_TEMPERATURE": 0.6,
"USE_QUERY_EXPANSION": True,
"SPELLCHECK_LANG": 'hu',
"MAX_HISTORY_TURNS": 3,
"HUNGARIAN_STOP_WORDS": set(
["a", "az", "egy", "és", "hogy", "ha", "is", "itt", "ki", "mi", "mit", "mikor", "hol", "hogyan", "nem", "ne",
"de", "csak", "meg", "megint", "már", "mint", "még", "vagy", "valamint", "van", "volt", "lesz", "kell",
"kellett", "lehet", "tud", "tudott", "fog", "fogja", "azt", "ezt", "ott", "ő", "ők", "én", "te", "mi", "ti",
"ön", "önök", "maga", "maguk", "ilyen", "olyan", "amely", "amelyek", "aki", "akik", "ahol", "amikor", "mert",
"ezért", "akkor", "így", "úgy", "pedig", "illetve", "továbbá", "azonban", "hanem", "viszont", "nélkül",
"alatt", "felett", "között", "előtt", "után", "mellett", "bele", "be", "fel", "le", "át", "szembe", "együtt",
"mindig", "soha", "gyakran", "néha", "talán", "esetleg", "biztosan", "nagyon", "kicsit", "éppen", "most",
"majd", "azután", "először", "utoljára", "igen", "sem", "túl", "kivéve", "szerint"])
}
# --- Segédfüggvények ---
def correct_spellings(text, spell_checker_instance):
"""
Kijavítja a helyesírási hibákat a szövegben.
"""
if not spell_checker_instance:
return text
try:
words = re.findall(r'\b\w+\b', text.lower())
misspelled = spell_checker_instance.unknown(words)
if not misspelled:
return text
corrected_text = text
for word in misspelled:
correction = spell_checker_instance.correction(word)
if correction and correction != word:
corrected_text = re.sub(r'\b' + re.escape(word) + r'\b', correction, corrected_text,
flags=re.IGNORECASE)
return corrected_text
except Exception as e:
print(f"{RED}Hiba a helyesírás javítása közben: {e}{RESET}")
return text
def get_query_category_with_llm(client, query):
"""
LLM-et használ a felhasználói kérdés kategorizálására, előre definiált listából választva.
"""
if not client:
return None
print(f" {CYAN}-> Lekérdezés kategorizálása LLM-mel...{RESET}")
category_list = ['IT biztonsági szolgáltatások', 'szolgáltatások', 'hardver', 'szoftver', 'hírek',
'audiovizuális konferenciatechnika']
categories_text = ", ".join([f"'{cat}'" for cat in category_list])
prompt = f"""Adott egy felhasználói kérdés. Adj meg egyetlen, rövid kategóriát a következő listából, ami a legjobban jellemzi a kérdést. A válaszodban csak a kategória szerepeljen, más szöveg, magyarázat, vagy írásjelek nélkül.
Lehetséges kategóriák: {categories_text}
Kérdés: '{query}'
Kategória:"""
messages = [{"role": "user", "content": prompt}]
try:
response = client.chat.completions.create(model=CONFIG["QUERY_EXPANSION_MODEL"], messages=messages,
temperature=0.1, max_tokens=30)
if response and response.choices:
category = response.choices[0].message.content.strip()
category = re.sub(r'\(.*?\)', '', category).strip()
category = re.sub(r'["\']', '', category).strip()
for cat in category_list:
if cat.lower() in category.lower():
print(f" {GREEN}-> A kérdés LLM által generált kategóriája: '{cat}'{RESET}")
return cat.lower()
print(f" {YELLOW}-> Az LLM nem talált megfelelő kategóriát, 'egyéb' kategória használata.{RESET}")
return 'egyéb'
except Exception as e:
print(f"{RED}Hiba LLM kategorizáláskor: {e}{RESET}")
return 'egyéb'
def expand_or_rewrite_query(original_query, client):
"""
Bővíti a felhasználói lekérdezést, hogy több releváns találat legyen.
"""
final_queries = [original_query]
if not CONFIG["USE_QUERY_EXPANSION"]:
return final_queries
print(f" {BLUE}-> Lekérdezés bővítése/átírása...{RESET}")
# JAVÍTOTT PROMPT: csak kulcsszavakat kérünk, magyarázat nélkül
prompt = f"Adott egy magyar nyelvű felhasználói kérdés: '{original_query}'. Generálj 2 db alternatív, releváns keresőkifejezést. A válaszodban csak ezeket add vissza, vesszővel (,) elválasztva, minden más szöveg nélkül."
messages = [{"role": "user", "content": prompt}]
try:
response = client.chat.completions.create(model=CONFIG["QUERY_EXPANSION_MODEL"], messages=messages,
temperature=0.5, max_tokens=100)
if response and response.choices:
generated_text = response.choices[0].message.content.strip()
# Módosítva: eltávolítjuk a felesleges karaktereket és magyarázó szöveget
alternatives = [q.strip().replace('"', '').replace("'", '').replace('.', '') for q in
generated_text.split(',') if q.strip() and q.strip() != original_query]
final_queries.extend(alternatives)
print(f" {GREEN}-> Bővített lekérdezések: {final_queries}{RESET}")
except Exception as e:
print(f"{RED}Hiba a lekérdezés bővítése során: {e}{RESET}")
return final_queries
def run_separate_searches(es_client, query_text, embedding_model, expanded_queries, query_category=None):
"""
Párhuzamosan futtatja a kulcsszavas és a kNN kereséseket.
"""
results = {'knn': {}, 'keyword': {}}
es_client_with_timeout = es_client.options(request_timeout=CONFIG["ES_CLIENT_TIMEOUT"])
source_fields = ["text_content", "source_url", "summary", "category"]
filters = []
# DRASZTIKUS VÁLTOZTATÁS:
# A kategóriaszűrés logikája kikapcsolva. A lekérdezés a teljes indexben fut.
# Ha a probléma a szűrésben van, ezzel a lépéssel azonosítható.
# A felhasználó igénye szerint vissza lehet kapcsolni, de először a teljes működését kell biztosítani.
# if query_category and query_category != 'egyéb':
# print(f" {MAGENTA}-> Kategória-alapú szűrés hozzáadása a kereséshez: '{query_category}'{RESET}")
# filters.append({"match": {"category": query_category}})
def knn_search(index, query_vector):
try:
knn_query = {"field": "embedding", "query_vector": query_vector, "k": CONFIG["INITIAL_SEARCH_SIZE"],
"num_candidates": CONFIG["KNN_NUM_CANDIDATES"], "filter": filters}
response = es_client_with_timeout.search(index=index, knn=knn_query, _source=source_fields,
size=CONFIG["INITIAL_SEARCH_SIZE"])
return index, response.get('hits', {}).get('hits', [])
except Exception as e:
print(f"{RED}Hiba kNN keresés során ({index}): {e}{RESET}")
return index, []
def keyword_search(index, expanded_queries):
try:
should_clauses = []
for q in expanded_queries:
should_clauses.append({"match": {"text_content": {"query": q, "operator": "OR", "fuzziness": "AUTO"}}})
query_body = {"query": {"bool": {"should": should_clauses, "minimum_should_match": 1, "filter": filters}}}
response = es_client_with_timeout.search(index=index, query=query_body['query'], _source=source_fields,
size=CONFIG["INITIAL_SEARCH_SIZE"])
return index, response.get('hits', {}).get('hits', [])
except Exception as e:
print(f"{RED}Hiba kulcsszavas keresés során ({index}): {e}{RESET}")
return index, []
query_vector = None
try:
query_vector = embedding_model.encode(query_text, normalize_embeddings=True).tolist()
except Exception as e:
print(f"{RED}Hiba az embedding generálásakor: {e}{RESET}")
with ThreadPoolExecutor(max_workers=len(CONFIG["VECTOR_INDEX_NAMES"]) * 2) as executor:
knn_futures = {executor.submit(knn_search, index, query_vector) for index in CONFIG["VECTOR_INDEX_NAMES"] if
query_vector}
keyword_futures = {executor.submit(keyword_search, index, expanded_queries) for index in
CONFIG["VECTOR_INDEX_NAMES"]}
for future in knn_futures:
index, hits = future.result()
results['knn'][index] = [(rank + 1, hit) for rank, hit in enumerate(hits)]
for future in keyword_futures:
index, hits = future.result()
results['keyword'][index] = [(rank + 1, hit) for rank, hit in enumerate(hits)]
# ÚJ LOGOLÁS: Kiírjuk a keresési találatok számát
total_knn_hits = sum(len(h) for h in results['knn'].values())
total_keyword_hits = sum(len(h) for h in results['keyword'].values())
print(f"{CYAN}Vektorkeresési találatok száma: {total_knn_hits}{RESET}")
print(f"{CYAN}Kulcsszavas keresési találatok száma: {total_keyword_hits}{RESET}")
return results
def merge_results_rrf(search_results):
"""
Egyesíti a keresési eredményeket az RRF algoritmussal.
"""
rrf_scores = defaultdict(float)
all_hits_data = {}
for search_type in search_results:
for index_name in search_results[search_type]:
for rank, hit in search_results[search_type][index_name]:
doc_id = hit['_id']
rrf_scores[doc_id] += 1.0 / (CONFIG["RRF_RANK_CONSTANT"] + rank)
if doc_id not in all_hits_data:
all_hits_data[doc_id] = hit
combined_results = [(doc_id, score, all_hits_data[doc_id]) for doc_id, score in rrf_scores.items()]
combined_results.sort(key=lambda item: item[1], reverse=True)
# ÚJ LOGOLÁS: Kiírjuk az RRF által rangsorolt top 5 pontszámot
print(
f"{CYAN}RRF által rangsorolt Top 5 pontszám: {[f'{score:.4f}' for doc_id, score, hit in combined_results[:5]]}{RESET}")
return combined_results
def retrieve_context_reranked(backend, query_text, confidence_threshold, fallback_message, query_category):
"""
Lekéri a kontextust a rangsorolás után.
"""
es_client = backend["es_client"]
embedding_model = backend["embedding_model"]
cross_encoder = backend["cross_encoder"]
llm_client = backend["llm_client"]
# DRASZTIKUS VÁLTOZTATÁS: A kategória-alapú szűrés kikapcsolva.
expanded_queries = expand_or_rewrite_query(query_text, llm_client)
search_results = run_separate_searches(es_client, query_text, embedding_model, expanded_queries)
merged_results = merge_results_rrf(search_results)
top_score = None
if not merged_results:
print(f"{YELLOW}A keresés nem hozott eredményt.{RESET}")
return fallback_message, [], top_score
candidates_to_rerank = merged_results[:CONFIG["RE_RANK_CANDIDATE_COUNT"]]
hits_data_for_reranking = [hit for _, _, hit in candidates_to_rerank]
query_chunk_pairs = [[query_text, hit['_source'].get('summary', hit['_source'].get('text_content'))] for hit in
hits_data_for_reranking if hit and '_source' in hit]
ranked_by_ce = []
if cross_encoder and query_chunk_pairs:
try:
ce_scores = cross_encoder.predict(query_chunk_pairs, show_progress_bar=False)
ranked_by_ce = sorted(zip(ce_scores, hits_data_for_reranking), key=lambda x: x[0], reverse=True)
print(f"{CYAN}Cross-Encoder pontszámok (Top 5):{RESET} {[f'{score:.4f}' for score, _ in ranked_by_ce[:5]]}")
except Exception as e:
print(f"{RED}Hiba a Cross-Encoder során: {e}{RESET}")
ranked_by_ce = []
if not ranked_by_ce and candidates_to_rerank:
print(f"{YELLOW}[INFO] Cross-Encoder nem futott, RRF sorrend használata.{RESET}")
ranked_by_ce = sorted([(score, hit) for _, score, hit in candidates_to_rerank], key=lambda x: x[0],
reverse=True)
if not ranked_by_ce:
return fallback_message, [], top_score
top_score = float(ranked_by_ce[0][0])
print(f"{GREEN}Legjobb találat pontszáma: {top_score:.4f}{RESET}")
if top_score < confidence_threshold:
print(
f"{YELLOW}A legjobb találat pontszáma ({top_score:.4f}) nem érte el a beállított küszöböt ({confidence_threshold}). A folyamat leáll.{RESET}")
dynamic_fallback = (
f"{fallback_message}\n\n"
f"A '{query_text}' kérdésre a legjobb találat megbízhatósági pontszáma ({top_score:.2f}) "
f"nem érte el a beállított küszöböt ({confidence_threshold:.2f})."
)
return dynamic_fallback, [], top_score
print(f"{GREEN}A Cross-Encoder magabiztos (legjobb score: {top_score:.4f}). A rangsorát használjuk.{RESET}")
final_hits_for_context = [hit for _, hit in ranked_by_ce[:CONFIG["NUM_CONTEXT_RESULTS"]]]
context_parts = [hit['_source'].get('summary', hit['_source'].get('text_content')) for hit in final_hits_for_context
if
hit and '_source' in hit and (hit['_source'].get('summary') or hit['_source'].get('text_content'))]
context_string = "\n\n---\n\n".join(context_parts)
sources = []
for hit_data in final_hits_for_context:
if hit_data and '_source' in hit_data:
source_info = {
"url": hit_data['_source'].get('source_url', hit_data.get('_index', '?')),
"content": hit_data['_source'].get('text_content', 'N/A')
}
if source_info not in sources:
sources.append(source_info)
return context_string, sources, top_score
def generate_answer_with_history(client, model_name, messages, temperature):
"""
Válasz generálása LLM-mel, figyelembe véve az előzményeket.
"""
try:
response = client.chat.completions.create(
model=model_name,
messages=messages,
temperature=temperature,
max_tokens=CONFIG["MAX_GENERATION_TOKENS"],
timeout=CONFIG["LLM_CLIENT_TIMEOUT"]
)
if response and response.choices:
return response.choices[0].message.content.strip()
return "Hiba: Nem érkezett érvényes válasz az AI modelltől."
except Exception as e:
error_message = str(e)
if "429" in error_message:
wait_time = 100
print(f"{YELLOW}Rate limit elérve. A program vár {wait_time} másodpercet...{RESET}")
time.sleep(wait_time)
return generate_answer_with_history(client, model_name, messages, temperature)
print(f"{RED}Hiba a válasz generálásakor: {e}{RESET}")
return "Hiba történt az AI modell hívásakor."
def search_in_feedback_index(es_client, embedding_model, question, min_score=0.75):
"""
Keres a visszajelzési adatbázisban a hasonló kérdésekre.
"""
try:
embedding = embedding_model.encode(question, normalize_embeddings=True).tolist()
knn_query = {"field": "embedding", "query_vector": embedding, "k": 1, "num_candidates": 10}
response = es_client.search(index=CONFIG["FEEDBACK_INDEX_NAME"], knn=knn_query,
_source=["question_text", "correction_text"])
hits = response.get('hits', {}).get('hits', [])
if hits and hits[0]['_score'] >= min_score:
top_hit = hits[0]
source = top_hit['_source']
score = top_hit['_score']
if score > 0.98:
return "direct_answer", source['correction_text']
instruction = f"Egy nagyon hasonló kérdésre ('{source['question_text']}') korábban a következő javítást/iránymutatást adtad: '{source['correction_text']}'. A válaszodat elsősorban ez alapján alkosd meg, még akkor is, ha a talált kontextus mást sugall!"
return "instruction", instruction
return None, None
except es_exceptions.NotFoundError:
return None, None
except Exception:
return None, None
def index_feedback(es_client, embedding_model, question, correction):
"""
Indexeli a visszajelzést.
"""
try:
embedding = embedding_model.encode(question, normalize_embeddings=True).tolist()
doc = {"question_text": question, "correction_text": correction, "embedding": embedding,
"timestamp": datetime.datetime.now()}
es_client.index(index=CONFIG["FEEDBACK_INDEX_NAME"], document=doc)
print(f"Visszajelzés sikeresen indexelve a '{CONFIG['FEEDBACK_INDEX_NAME']}' indexbe.")
return True
except Exception as e:
print(f"{RED}Hiba a visszajelzés indexelése során: {e}{RESET}")
return False
def get_all_feedback(es_client, index_name):
"""
Lekéri az összes visszajelzést.
"""
try:
response = es_client.search(index=index_name, query={"match_all": {}}, size=1000,
sort=[{"timestamp": {"order": "desc"}}])
return response.get('hits', {}).get('hits', [])
except es_exceptions.NotFoundError:
return []
except Exception as e:
print(f"{RED}Hiba a visszajelzések listázása során: {e}{RESET}")
return []
def delete_feedback_by_id(es_client, index_name, doc_id):
"""
Töröl egy visszajelzést ID alapján.
"""
try:
es_client.delete(index=index_name, id=doc_id)
return True
except Exception as e:
print(f"{RED}Hiba a visszajelzés törlése során (ID: {doc_id}): {e}{RESET}")
return False
def update_feedback_comment(es_client, index_name, doc_id, new_comment):
"""
Frissít egy visszajelzést ID alapján.
"""
try:
es_client.update(index=index_name, id=doc_id, doc={"correction_text": new_comment})
return True
except Exception as e:
print(f"{RED}Hiba a visszajelzés szerkesztése során (ID: {doc_id}): {e}{RESET}")
return False
def initialize_backend():
"""
Inicializálja a backend komponenseit.
"""
print("----- Backend Motor Inicializálása -----")
load_dotenv()
try:
nltk.data.find('tokenizers/punkt')
except LookupError:
nltk.download('punkt', quiet=True)
warnings.filterwarnings("ignore", message=".*verify_certs=False.*")
spell_checker = None
try:
spell_checker = SpellChecker(language=CONFIG["SPELLCHECK_LANG"])
custom_words = ["dunaelektronika", "kft", "outsourcing", "dell", "lenovo", "nis2", "szerver", "kliens",
"hálózati", "hpe"]
spell_checker.word_frequency.load_words(custom_words)
except Exception as e:
print(f"{RED}Helyesírás-ellenőrző hiba: {e}{RESET}")
backend_objects = {
"es_client": Elasticsearch(CONFIG["ELASTIC_HOST"], basic_auth=("elastic", CONFIG["ELASTIC_PASSWORD"]),
verify_certs=False),
"embedding_model": SentenceTransformer(CONFIG["EMBEDDING_MODEL_NAME"],
device='cuda' if torch.cuda.is_available() else 'cpu'),
"cross_encoder": CrossEncoder(CONFIG["CROSS_ENCODER_MODEL_NAME"],
device='cuda' if torch.cuda.is_available() else 'cpu'),
"llm_client": Together(api_key=os.getenv("TOGETHER_API_KEY")),
"spell_checker": spell_checker
}
print(f"{GREEN}----- Backend Motor Készen Áll -----{RESET}")
return backend_objects
def process_query(user_question, chat_history, backend, confidence_threshold, fallback_message):
"""
A teljes lekérdezés-feldolgozási munkafolyamatot vezérli.
"""
print(f"\n{BLUE}----- Új lekérdezés feldolgozása ----{RESET}")
print(f"{BLUE}Kérdés: {user_question}{RESET}")
corrected_question = correct_spellings(user_question, backend["spell_checker"])
print(f"{BLUE}Javított kérdés: {corrected_question}{RESET}")
feedback_type, feedback_content = search_in_feedback_index(
backend["es_client"], backend["embedding_model"], corrected_question
)
if feedback_type == "direct_answer":
print(f"{GREEN}Direkt válasz a visszajelzési adatbázisból.{RESET}")
return {
"answer": feedback_content,
"sources": [
{"url": "Személyes visszajelzés alapján", "content": "Ez egy korábban megadott, pontosított válasz."}],
"corrected_question": corrected_question,
"confidence_score": 10.0
}
feedback_instructions = feedback_content if feedback_type == "instruction" else None
query_category = get_query_category_with_llm(backend["llm_client"], corrected_question)
retrieved_context, sources, confidence_score = retrieve_context_reranked(backend, corrected_question,
confidence_threshold, fallback_message,
query_category)
if not sources and not feedback_instructions:
return {
"answer": retrieved_context,
"sources": [],
"corrected_question": corrected_question,
"confidence_score": confidence_score
}
prompt_instructions = ""
if feedback_instructions:
prompt_instructions = f"""
KÜLÖNLEGESEN FONTOS FEJLESZTŐI UTASÍTÁS (ezt vedd figyelembe a leginkább!):
---
{feedback_instructions}
---
"""
system_prompt = f"""Te egy professzionális, segítőkész AI asszisztens vagy.
A feladatod, hogy a KONTEXTUS-ból és a FEJLESZTŐI UTASÍTÁSOKBÓL származó információkat egyetlen, jól strukturált és ismétlés-mentes válasszá szintetizálld.
{prompt_instructions}
KRITIKUS SZABÁLY: Értékeld a kapott KONTEXTUS relevanciáját a felhasználó kérdéséhez képest. Ha egy kontextus-részlet nem kapcsolódik szorosan a kérdéshez, azt hagyd figyelmen kívül!
FIGYELEM: Szigorúan csak a megadott KONTEXTUS-ra és a fejlesztői utasításokra támaszkodj. Ha a releváns információk alapján nem tudsz válaszolni, add ezt a választ: '{fallback_message}'
KONTEXTUS:
---
{retrieved_context if sources else "A tudásbázisban nem található releváns információ."}
---
ELŐZMÉNYEK (ha releváns): Lásd a korábbi üzeneteket.
"""
messages_for_llm = []
if chat_history:
messages_for_llm.extend(chat_history[-(CONFIG["MAX_HISTORY_TURNS"] * 2):])
messages_for_llm.append({"role": "system", "content": system_prompt})
messages_for_llm.append({"role": "user", "content": corrected_question})
answer = generate_answer_with_history(
backend["llm_client"], CONFIG["TOGETHER_MODEL_NAME"], messages_for_llm, CONFIG["GENERATION_TEMPERATURE"]
)
return {
"answer": answer,
"sources": sources,
"corrected_question": corrected_question,
"confidence_score": confidence_score
}