import gradio as gr import requests import numpy as np import time import json import os # Import the utilities with proper error handling try: from utils.encoding_input import encode_text from utils.retrieve_n_rerank import retrieve_and_rerank from utils.sentiment_analysis import get_sentiment from utils.coherence_bbscore import coherence_report from utils.loading_embeddings import get_vectorstore from utils.model_generation import build_messages except ImportError as e: print(f"Import error: {e}") print("Make sure you're running from the correct directory and all dependencies are installed.") API_KEY = os.getenv("API_KEY", "sk-do-8Hjf0liuGQCoPwglilL49xiqrthMECwjGP_kAjPM53OTOFQczPyfPK8xJc") MODEL = "llama3.3-70b-instruct" # Global settings for sentiment and coherence analysis ENABLE_SENTIMENT = True ENABLE_COHERENCE = True def chat_response(message, history, enable_sentiment, enable_coherence): """ Generate response for chat interface. Args: message: Current user message history: List of [user_message, bot_response] pairs enable_sentiment: Whether to enable sentiment analysis enable_coherence: Whether to enable coherence analysis """ try: # Initialize vectorstore when needed vectorstore = get_vectorstore() # Retrieve and rerank documents reranked_results = retrieve_and_rerank( query_text=message, vectorstore=vectorstore, k=50, # number of initial documents to retrieve rerank_model="cross-encoder/ms-marco-MiniLM-L-6-v2", top_m=20, # number of documents to return after reranking min_score=0.5, # minimum score for reranked documents only_docs=False # return both documents and scores ) if not reranked_results: return "I'm sorry, I couldn't find any relevant information in the policy documents to answer your question. Could you try rephrasing your question or asking about a different topic?" top_docs = [doc for doc, score in reranked_results] # Perform sentiment and coherence analysis if enabled sentiment_rollup = get_sentiment(top_docs) if enable_sentiment else {} coherence_report_ = coherence_report(reranked_results=top_docs, input_text=message) if enable_coherence else "" # Build messages for the LLM, including conversation history messages = build_messages_with_history( query=message, history=history, top_docs=top_docs, task_mode="verbatim_sentiment", sentiment_rollup=sentiment_rollup, coherence_report=coherence_report_, ) # Stream response from the API response = "" for chunk in stream_llm_response(messages): response += chunk yield response except Exception as e: error_msg = f"I encountered an error while processing your request: {str(e)}" yield error_msg def build_messages_with_history(query, history, top_docs, task_mode, sentiment_rollup, coherence_report): """Build messages including conversation history for better context.""" # System message system_msg = ( "You are a compliance-grade policy analyst assistant specializing in Kenya policy documents. " "Your job is to return precise, fact-grounded responses based on the provided policy documents. " "Avoid hallucinations. Base everything strictly on the content provided. " "Maintain conversation context from previous exchanges when relevant. " "If sentiment or coherence analysis is not available, do not mention it in the response." ) messages = [{"role": "system", "content": system_msg}] # Add conversation history (keep last 4 exchanges to maintain context without exceeding limits) recent_history = history[-4:] if len(history) > 4 else history for user_msg, bot_msg in recent_history: messages.append({"role": "user", "content": user_msg}) messages.append({"role": "assistant", "content": bot_msg}) # Build context from retrieved documents context_block = "\n\n".join([ f"**Source: {getattr(doc, 'metadata', {}).get('source', 'Unknown')} " f"(Page {getattr(doc, 'metadata', {}).get('page', 'Unknown')})**\n" f"{doc.page_content}\n" for doc in top_docs[:10] # Limit to top 10 docs to avoid token limits ]) # Current user query with context current_query = f""" Query: {query} Based on the following policy documents, please provide: 1) **Quoted Policy Excerpts**: Quote key policy content directly. Cite the source using filename and page. 2) **Analysis**: Explain the policy implications in clear terms. """ if sentiment_rollup: current_query += f"\n3) **Sentiment Summary**: {sentiment_rollup}" if coherence_report: current_query += f"\n4) **Coherence Assessment**: {coherence_report}" current_query += f"\n\nContext Sources:\n{context_block}" messages.append({"role": "user", "content": current_query}) return messages def stream_llm_response(messages): """Stream response from the LLM API.""" headers = { "Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json" } data = { "model": MODEL, "messages": messages, "temperature": 0.2, "stream": True, "max_tokens": 2000 } try: with requests.post("https://inference.do-ai.run/v1/chat/completions", headers=headers, json=data, stream=True, timeout=30) as r: if r.status_code != 200: yield f"[ERROR] API returned status {r.status_code}: {r.text}" return for line in r.iter_lines(decode_unicode=True): if not line or line.strip() == "data: [DONE]": continue if line.startswith("data: "): line = line[len("data: "):] try: chunk = json.loads(line) delta = chunk.get("choices", [{}])[0].get("delta", {}).get("content", "") if delta: yield delta time.sleep(0.01) # Small delay for smooth streaming except json.JSONDecodeError: continue except Exception as e: print(f"Streaming error: {e}") continue except requests.exceptions.RequestException as e: yield f"[ERROR] Network error: {str(e)}" except Exception as e: yield f"[ERROR] Unexpected error: {str(e)}" def update_sentiment_setting(enable): """Update global sentiment analysis setting.""" global ENABLE_SENTIMENT ENABLE_SENTIMENT = enable return f"Sentiment analysis {'enabled' if enable else 'disabled'}" def update_coherence_setting(enable): """Update global coherence analysis setting.""" global ENABLE_COHERENCE ENABLE_COHERENCE = enable return f"Coherence analysis {'enabled' if enable else 'disabled'}" # Create the chat interface with gr.Blocks(title="Kenya Policy Assistant - Chat", theme=gr.themes.Soft()) as demo: gr.Markdown(""" # đŸ›ī¸ Kenya Policy Assistant - Interactive Chat Ask questions about Kenya's policies and have a conversation! I can help you understand policy documents with sentiment and coherence analysis. """) with gr.Row(): with gr.Column(scale=3): # Main chat interface chatbot = gr.Chatbot( height=600, bubble_full_width=False, show_copy_button=True, show_share_button=True ) with gr.Row(): sentiment_toggle = gr.Checkbox( label="Enable Sentiment Analysis", value=True, info="Analyze the tone and sentiment of policy documents" ) coherence_toggle = gr.Checkbox( label="Enable Coherence Analysis", value=True, info="Check coherence and consistency of retrieved documents" ) with gr.Column(scale=1): gr.Markdown(""" ### 💡 Tips for Better Results - Ask specific questions about Kenya policies - You can ask follow-up questions - Reference previous answers in your questions - Use phrases like "What does this mean?" or "Can you elaborate?" ### 📝 Example Questions - "What are Kenya's renewable energy policies?" - "Tell me about water management regulations" - "What penalties exist for environmental violations?" - "How does this relate to what you just mentioned?" """) with gr.Accordion("âš™ī¸ Settings", open=False): gr.Markdown("Toggle analysis features on/off") sentiment_status = gr.Textbox( value="Sentiment analysis enabled", label="Sentiment Status", interactive=False ) coherence_status = gr.Textbox( value="Coherence analysis enabled", label="Coherence Status", interactive=False ) # Create the chat interface with custom response function chat_interface = gr.ChatInterface( fn=lambda message, history: chat_response(message, history, ENABLE_SENTIMENT, ENABLE_COHERENCE), chatbot=chatbot, title="", # We already have a title above description="", # We already have description above examples=[ "What are the objectives of Kenya's energy policies?", "Tell me about environmental protection regulations", "What are the penalties for water pollution?", "How are renewable energy projects regulated?", "What does the constitution say about natural resources?" ], cache_examples=False, retry_btn="🔄 Retry", undo_btn="â†Šī¸ Undo", clear_btn="đŸ—‘ī¸ Clear Chat" ) # Update settings when toggles change sentiment_toggle.change( fn=update_sentiment_setting, inputs=[sentiment_toggle], outputs=[sentiment_status] ) coherence_toggle.change( fn=update_coherence_setting, inputs=[coherence_toggle], outputs=[coherence_status] ) if __name__ == "__main__": print("🚀 Starting Kenya Policy Assistant Chat...") demo.queue(max_size=20).launch( share=True, debug=True, server_name="0.0.0.0", server_port=7860 )