Spaces:
Runtime error
Runtime error
| # -*- coding: utf-8 -*- | |
| """ | |
| IT Support Chatbot Application | |
| - Converts the original Colab notebook into a deployable Gradio app. | |
| - Connects to a prebuilt Qdrant index instead of rebuilding it on startup. | |
| - Uses environment variables for API keys. | |
| - Implements a RAG pipeline with LLaMA 3.1, Qdrant, and Hybrid Retrieval. | |
| """ | |
| # --- CELL 1: Imports, Logging & Reproducibility --- | |
| import os | |
| import random | |
| import logging | |
| import numpy as np | |
| import torch | |
| import nest_asyncio | |
| import pandas as pd | |
| import gradio as gr | |
| from typing import List | |
| # Llama-Index & Transformers | |
| from llama_index.core import ( | |
| SimpleDirectoryReader, VectorStoreIndex, StorageContext, | |
| PromptTemplate, Settings, QueryBundle, Document | |
| ) | |
| from llama_index.core.postprocessor import SentenceTransformerRerank | |
| from llama_index.core.retrievers import BaseRetriever | |
| from llama_index.retrievers.bm25 import BM25Retriever | |
| from llama_index.vector_stores.qdrant import QdrantVectorStore | |
| from llama_index.embeddings.huggingface import HuggingFaceEmbedding | |
| from llama_index.core.node_parser import SentenceSplitter | |
| from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline, BitsAndBytesConfig | |
| from huggingface_hub import login | |
| import qdrant_client | |
| from llama_index.core.query_engine import RetrieverQueryEngine | |
| # Configure logging | |
| logging.basicConfig( | |
| format='%(asctime)s %(levelname)s: %(message)s', | |
| level=logging.INFO | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Apply nest_asyncio for environments like notebooks | |
| nest_asyncio.apply() | |
| # Reproducibility | |
| SEED = 42 | |
| random.seed(SEED) | |
| np.random.seed(SEED) | |
| torch.manual_seed(SEED) | |
| # --- CELL 0: Load secrets from environment variables --- | |
| QDRANT_HOST = os.getenv("QDRANT_HOST") | |
| QDRANT_API_KEY = os.getenv("QDRANT_API_KEY") | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| if not all([QDRANT_HOST, QDRANT_API_KEY, HF_TOKEN]): | |
| raise EnvironmentError( | |
| "Please set QDRANT_HOST, QDRANT_API_KEY, and HF_TOKEN environment variables." | |
| ) | |
| # Login to Hugging Face | |
| login(token=HF_TOKEN) | |
| # Initialize Qdrant client | |
| qdrant = qdrant_client.QdrantClient( | |
| url=QDRANT_HOST, | |
| api_key=QDRANT_API_KEY, | |
| prefer_grpc=False | |
| ) | |
| COLLECTION_NAME = "it_support_rag" | |
| # --- CELL 3: Load Dataset & Build Documents --- | |
| CSV_PATH = "data.csv" | |
| if not os.path.exists(CSV_PATH): | |
| raise FileNotFoundError( | |
| f"The data file was not found at {CSV_PATH}. Please upload your data CSV and name it correctly." | |
| ) | |
| df = pd.read_csv(CSV_PATH, encoding="ISO-8859-1") | |
| case_docs: List[Document] = [] | |
| for _, row in df.iterrows(): | |
| text = str(row.get("text_chunk", "")) | |
| meta = { | |
| "source_dataset": str(row.get("source_dataset", ""))[:50], | |
| "category": str(row.get("category", ""))[:100], | |
| "orig_query": str(row.get("original_query", ""))[:200], | |
| "orig_solution": str(row.get("original_solution", ""))[:200], | |
| } | |
| case_docs.append(Document(text=text, metadata=meta)) | |
| logger.info(f"Loaded {len(case_docs)} documents from {CSV_PATH}.") | |
| # --- ADD THIS CODE SNIPPET --- | |
| # Define and configure the embedding model | |
| #EMBED_MODEL_ID = "BAAI/bge-small-en-v1.5" | |
| EMBED_MODEL_ID = "thenlper/gte-large" | |
| embed_model = HuggingFaceEmbedding( | |
| model_name=EMBED_MODEL_ID | |
| ) | |
| # Set the global embedding model for Llama-Index | |
| Settings.embed_model = embed_model | |
| logger.info(f"β Set global embedding model to {EMBED_MODEL_ID}") | |
| # --- END OF SNIPPET --- | |
| # --- CELL 4: Load prebuilt Vector Index --- | |
| vector_store = QdrantVectorStore( | |
| client=qdrant, | |
| collection_name=COLLECTION_NAME, | |
| prefer_grpc=False | |
| ) | |
| storage_context = StorageContext.from_defaults(vector_store=vector_store) | |
| # This line is now fixed to use the correct API method | |
| index = VectorStoreIndex.from_vector_store(vector_store=vector_store) | |
| logger.info("β Loaded existing VectorStoreIndex from Qdrant") | |
| # --- CELL 4: Load prebuilt Vector Index --- | |
| vector_store = QdrantVectorStore( | |
| client=qdrant, | |
| collection_name=COLLECTION_NAME, | |
| prefer_grpc=False | |
| ) | |
| storage_context = StorageContext.from_defaults(vector_store=vector_store) | |
| # Use from_storage_context to load existing index | |
| index = VectorStoreIndex.from_vector_store(vector_store=vector_store) | |
| logger.info("β Loaded existing VectorStoreIndex from Qdrant") | |
| # --- CELL 5: Define Hybrid Retriever & Reranker --- | |
| Settings.llm = None # Use our own LLM pipeline | |
| class HybridRetriever(BaseRetriever): | |
| def __init__(self, dense, bm25): | |
| super().__init__() | |
| self.dense = dense | |
| self.bm25 = bm25 | |
| def _retrieve(self, query_bundle: QueryBundle) -> List[Document]: | |
| dense_hits = self.dense.retrieve(query_bundle) | |
| bm25_hits = self.bm25.retrieve(query_bundle) | |
| combined = dense_hits + bm25_hits | |
| unique = [] | |
| seen = set() | |
| for hit in combined: | |
| nid = hit.node.node_id | |
| if nid not in seen: | |
| seen.add(nid) | |
| unique.append(hit) | |
| return unique | |
| # Instantiate retrievers | |
| dense_retriever = index.as_retriever(similarity_top_k=10) | |
| bm25_nodes = SentenceSplitter(chunk_size=1024, chunk_overlap=100).get_nodes_from_documents(case_docs) | |
| bm25_retriever = BM25Retriever.from_defaults( | |
| nodes=bm25_nodes, | |
| similarity_top_k=10, | |
| ) | |
| hybrid_retriever = HybridRetriever(dense=dense_retriever, bm25=bm25_retriever) | |
| reranker = SentenceTransformerRerank( | |
| model="cross-encoder/ms-marco-MiniLM-L-2-v2", | |
| top_n=4, | |
| device="cuda" if torch.cuda.is_available() else "cpu" | |
| ) | |
| query_engine = RetrieverQueryEngine( | |
| retriever=hybrid_retriever, | |
| node_postprocessors=[reranker] | |
| ) | |
| # --- CELL 6: Load & Quantize LLaMA Model --- | |
| quant_config = BitsAndBytesConfig( | |
| load_in_4bit=True, | |
| bnb_4bit_quant_type="nf4", | |
| bnb_4bit_use_double_quant=True, | |
| bnb_4bit_compute_dtype=torch.bfloat16 | |
| ) | |
| MODEL_ID = "meta-llama/Llama-3.1-8B-Instruct" | |
| logger.info(f"Loading model: {MODEL_ID}") | |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, use_fast=True) | |
| llm = AutoModelForCausalLM.from_pretrained( | |
| MODEL_ID, | |
| quantization_config=quant_config, | |
| device_map="auto" | |
| ) | |
| logger.info("Model loaded successfully.") | |
| generator = pipeline( | |
| task="text-generation", | |
| model=llm, | |
| tokenizer=tokenizer, | |
| device_map="auto" | |
| ) | |
| # --- CELL 7: Chat Logic and Prompting --- | |
| SYSTEM_PROMPT = ( | |
| "You are a friendly and helpful Level 0 IT Support Assistant. " | |
| "Use a conversational tone and guide users step-by-step. " | |
| "If the user's question lacks details or clarity, ask a concise follow-up question " | |
| "to gather the information you need before providing a solution. " | |
| "Once clarified, then:\n" | |
| "1. Diagnose the problem.\n" | |
| "2. Provide step-by-step solutions with bullet points.\n" | |
| "3. Offer additional recommendations or safety warnings.\n" | |
| "4. End with a polite closing." | |
| ) | |
| HDR = { | |
| "sys": "<|start_header_id|>system<|end_header_id|>", | |
| "usr": "<|start_header_id|>user<|end_header_id|>", | |
| "ast": "<|start_header_id|>assistant<|end_header_id|>", | |
| "eot": "<|eot_id|>" | |
| } | |
| chat_history = [] | |
| GREETINGS = {"hello", "hi", "hey", "good morning", "good afternoon", "good evening"} | |
| def format_history(history): | |
| return "".join( | |
| f"{HDR['usr']}\n{u}{HDR['eot']}{HDR['ast']}\n{a}{HDR['eot']}" for u, a in history | |
| ) | |
| def build_prompt(query, context, history): | |
| if query.lower().strip() in GREETINGS: | |
| return None, "greeting" | |
| words = query.strip().split() | |
| if len(words) < 3: | |
| return ( | |
| "Could you provide more detail about what you're experiencing? " | |
| "Any error messages or steps you've tried will help me assist you." | |
| ), "clarify" | |
| context_str = "\n---\n".join(node.text for node in context) if context else "No context provided." | |
| hist_str = format_history(history[-3:]) | |
| prompt = ( | |
| "<|begin_of_text|>" | |
| f"{HDR['sys']}\n{SYSTEM_PROMPT}{HDR['eot']}" | |
| f"{hist_str}" | |
| f"{HDR['usr']}\nContext:\n{context_str}\n\nQuestion: {query}{ HDR['eot']}" | |
| f"{HDR['ast']}\n" | |
| ) | |
| return prompt, "rag" | |
| def chat(query, temperature=0.7, top_p=0.9): | |
| global chat_history | |
| prompt, mode = build_prompt(query, [], chat_history) | |
| if mode == "greeting": | |
| reply = "Hello there! How can I help with your IT support question today?" | |
| chat_history.append((query, reply)) | |
| return reply | |
| if mode == "clarify": | |
| reply = prompt | |
| chat_history.append((query, reply)) | |
| return reply | |
| response = query_engine.query(query) | |
| context_nodes = response.source_nodes | |
| prompt, _ = build_prompt(query, context_nodes, chat_history) | |
| gen_args = { | |
| "do_sample": True, | |
| "max_new_tokens": 350, | |
| "temperature": temperature, | |
| "top_p": top_p, | |
| "eos_token_id": tokenizer.eos_token_id | |
| } | |
| output = generator(prompt, **gen_args) | |
| text = output[0]["generated_text"] | |
| answer = text.split(HDR["ast"])[-1].strip() | |
| chat_history.append((query, answer)) | |
| return answer, context_nodes | |
| # --- CELL 8: Gradio Interface --- | |
| with gr.Blocks(theme=gr.themes.Soft(), title="π¬ Level 0 IT Support Chatbot") as demo: | |
| gr.Markdown("### π€ Level 0 IT Support Chatbot (RAG + Qdrant + LLaMA3)") | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| chatbot = gr.Chatbot(label="Chat", height=500, bubble_full_width=False) | |
| inp = gr.Textbox(placeholder="Ask your IT support question...", label="Your Message", lines=2) | |
| with gr.Row(): | |
| send_btn = gr.Button("Send", variant="primary") | |
| clear_btn = gr.Button("Clear Chat", variant="secondary") | |
| with gr.Column(scale=1): | |
| gr.Markdown("### βοΈ Settings") | |
| k_slider = gr.Slider(1, 20, value=10, step=1, label="Context Hits (k)") | |
| temp_slider = gr.Slider(0.0, 1.0, value=0.7, step=0.01, label="Temperature") | |
| top_p_slider = gr.Slider(0.0, 1.0, value=0.9, step=0.01, label="Top-p") | |
| with gr.Accordion("Show Retrieved Context", open=False): | |
| context_display = gr.Textbox(label="Retrieved Context", interactive=False, lines=10) | |
| def respond(message, history, k, temp, top_p): | |
| global chat_history | |
| dense_retriever.similarity_top_k = k | |
| bm25_retriever.similarity_top_k = k | |
| # Call the chat function and get its return value | |
| chat_result = chat(message, temperature=temp, top_p=top_p) | |
| # Check if the result is a tuple (the RAG case) or a single string | |
| if isinstance(chat_result, tuple) and len(chat_result) == 2: | |
| reply, context_nodes = chat_result | |
| else: | |
| # This handles the greeting and clarification cases | |
| reply = chat_result | |
| context_nodes = [] # Set context_nodes to an empty list | |
| # This part of the code remains the same | |
| ctx_text = "\n\n---\n\n".join([ | |
| f"**Source {i+1} (Score: {node.score:.4f})**\n{node.text}" | |
| for i, node in enumerate(context_nodes) | |
| ]) | |
| history.append([message, reply]) | |
| return "", history, ctx_text | |
| def clear_chat(): | |
| global chat_history | |
| chat_history = [] | |
| return [], None | |
| inp.submit(respond, [inp, chatbot, k_slider, temp_slider, top_p_slider], [inp, chatbot, context_display]) | |
| send_btn.click(respond, [inp, chatbot, k_slider, temp_slider, top_p_slider], [inp, chatbot, context_display]) | |
| clear_btn.click(clear_chat, None, [chatbot, context_display], queue=False) | |
| if __name__ == "__main__": | |
| logger.info("Launching Gradio interface...") | |
| demo.launch(server_name="0.0.0.0", server_port=7860, debug=True) |