Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import os | |
| import logging | |
| from typing import List, Dict, Tuple | |
| import numpy as np | |
| from analyzer import combine_repo_files_for_llm, handle_load_repository | |
| from hf_utils import download_filtered_space_files | |
| # Setup logger | |
| logger = logging.getLogger(__name__) | |
| class SimpleVectorStore: | |
| """Simple in-memory vector store for repository chunks.""" | |
| def __init__(self): | |
| self.chunks = [] | |
| self.embeddings = [] | |
| self.chunk_metadata = [] | |
| self.model = None | |
| def _get_embedding_model(self): | |
| """Lazy load the embedding model.""" | |
| if self.model is None: | |
| try: | |
| from sentence_transformers import SentenceTransformer | |
| self.model = SentenceTransformer('all-MiniLM-L6-v2') # Lightweight, fast model | |
| logger.info("Loaded SentenceTransformer model for vectorization") | |
| except ImportError: | |
| logger.error("sentence-transformers not installed. Install with: pip install sentence-transformers") | |
| raise ImportError("sentence-transformers package is required for vectorization") | |
| return self.model | |
| def add_chunks(self, chunks: List[str], metadata: List[Dict] = None): | |
| """Add text chunks and create embeddings.""" | |
| try: | |
| model = self._get_embedding_model() | |
| embeddings = model.encode(chunks, convert_to_tensor=False) | |
| self.chunks.extend(chunks) | |
| self.embeddings.extend(embeddings) | |
| self.chunk_metadata.extend(metadata or [{} for _ in chunks]) | |
| logger.info(f"Added {len(chunks)} chunks to vector store") | |
| except Exception as e: | |
| logger.error(f"Error adding chunks to vector store: {e}") | |
| def search(self, query: str, top_k: int = 3) -> List[Tuple[str, float, Dict]]: | |
| """Search for similar chunks using cosine similarity.""" | |
| if not self.chunks or not self.embeddings: | |
| return [] | |
| try: | |
| model = self._get_embedding_model() | |
| query_embedding = model.encode([query], convert_to_tensor=False)[0] | |
| # Calculate cosine similarities | |
| similarities = [] | |
| for i, chunk_embedding in enumerate(self.embeddings): | |
| similarity = np.dot(query_embedding, chunk_embedding) / ( | |
| np.linalg.norm(query_embedding) * np.linalg.norm(chunk_embedding) | |
| ) | |
| similarities.append((self.chunks[i], similarity, self.chunk_metadata[i])) | |
| # Sort by similarity and return top_k | |
| similarities.sort(key=lambda x: x[1], reverse=True) | |
| return similarities[:top_k] | |
| except Exception as e: | |
| logger.error(f"Error searching vector store: {e}") | |
| return [] | |
| def clear(self): | |
| """Clear all stored data.""" | |
| self.chunks = [] | |
| self.embeddings = [] | |
| self.chunk_metadata = [] | |
| def get_stats(self) -> Dict: | |
| """Get statistics about the vector store.""" | |
| return { | |
| 'total_chunks': len(self.chunks), | |
| 'total_embeddings': len(self.embeddings), | |
| 'model_loaded': self.model is not None | |
| } | |
| # Global vector store instance | |
| vector_store = SimpleVectorStore() | |
| def vectorize_repository_content(repo_content: str, repo_id: str, chunk_size: int = 500) -> bool: | |
| """ | |
| Vectorize repository content by splitting into chunks and creating embeddings. | |
| Args: | |
| repo_content: The combined repository content | |
| repo_id: Repository identifier | |
| chunk_size: Number of lines per chunk | |
| Returns: | |
| bool: True if vectorization was successful | |
| """ | |
| try: | |
| # Clear previous data | |
| vector_store.clear() | |
| lines = repo_content.split('\n') | |
| chunks = [] | |
| metadata = [] | |
| # Split into chunks with overlap for better context | |
| overlap = 50 # lines of overlap between chunks | |
| for i in range(0, len(lines), chunk_size - overlap): | |
| chunk_lines = lines[i:i + chunk_size] | |
| chunk_text = '\n'.join(chunk_lines) | |
| if chunk_text.strip(): # Only add non-empty chunks | |
| chunks.append(chunk_text) | |
| metadata.append({ | |
| 'repo_id': repo_id, | |
| 'chunk_index': len(chunks) - 1, | |
| 'start_line': i, | |
| 'end_line': min(i + chunk_size, len(lines)) | |
| }) | |
| # Add chunks to vector store | |
| vector_store.add_chunks(chunks, metadata) | |
| logger.info(f"Successfully vectorized {len(chunks)} chunks for repository {repo_id}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error vectorizing repository content: {e}") | |
| return False | |
| def create_repo_explorer_tab() -> Tuple[Dict[str, gr.components.Component], Dict[str, gr.State]]: | |
| """ | |
| Creates the Repo Explorer tab content and returns the component references and state variables. | |
| """ | |
| # State variables for repo explorer | |
| states = { | |
| "repo_context_summary": gr.State(""), | |
| "current_repo_id": gr.State("") | |
| } | |
| gr.Markdown("### ποΈ Deep Dive into a Specific Repository") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| repo_explorer_input = gr.Textbox( | |
| label="π Repository ID", | |
| placeholder="microsoft/DialoGPT-medium", | |
| info="Enter a Hugging Face repository ID to explore" | |
| ) | |
| with gr.Column(scale=1): | |
| load_repo_btn = gr.Button("π Load Repository", variant="primary", size="lg") | |
| with gr.Row(): | |
| visit_hf_link = gr.HTML( | |
| value="", | |
| label="π Repository Link", | |
| visible=False | |
| ) | |
| with gr.Row(): | |
| repo_status_display = gr.Textbox( | |
| label="π Repository Status", | |
| interactive=False, | |
| lines=4, | |
| info="Current repository loading status and vectorization info" | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| repo_chatbot = gr.Chatbot( | |
| label="π€ Repository Assistant", | |
| height=400, | |
| type="messages", | |
| avatar_images=( | |
| "https://cdn-icons-png.flaticon.com/512/149/149071.png", | |
| "https://huggingface.co/datasets/huggingface/brand-assets/resolve/main/hf-logo.png" | |
| ), | |
| show_copy_button=True, | |
| value=[] # Start empty - welcome message will appear only after repo is loaded | |
| ) | |
| with gr.Row(): | |
| repo_msg_input = gr.Textbox( | |
| label="π Ask about this repository", | |
| placeholder="What does this repository do? How do I use it?", | |
| lines=1, | |
| scale=4, | |
| info="Ask anything about the loaded repository" | |
| ) | |
| repo_send_btn = gr.Button("π€ Send", variant="primary", scale=1) | |
| # with gr.Column(scale=1): | |
| # # Repository content preview | |
| # repo_content_display = gr.Textbox( | |
| # label="π Repository Content Preview", | |
| # lines=20, | |
| # show_copy_button=True, | |
| # interactive=False, | |
| # info="Overview of the loaded repository structure and content" | |
| # ) | |
| # Component references | |
| components = { | |
| "repo_explorer_input": repo_explorer_input, | |
| "load_repo_btn": load_repo_btn, | |
| "visit_hf_link": visit_hf_link, | |
| "repo_status_display": repo_status_display, | |
| "repo_chatbot": repo_chatbot, | |
| "repo_msg_input": repo_msg_input, | |
| "repo_send_btn": repo_send_btn, | |
| # "repo_content_display": repo_content_display | |
| } | |
| return components, states | |
| def handle_repo_user_message(user_message: str, history: List[Dict[str, str]], repo_context_summary: str, repo_id: str) -> Tuple[List[Dict[str, str]], str]: | |
| """Handle user messages in the repo-specific chatbot.""" | |
| if not repo_context_summary.strip(): | |
| return history, "" | |
| # Initialize with repository-specific welcome message if empty | |
| if not history: | |
| welcome_msg = f"Hello! I'm your assistant for the '{repo_id}' repository. I have analyzed all the files and created a comprehensive understanding of this repository. I'm ready to answer any questions about its functionality, usage, architecture, and more. What would you like to know?" | |
| history = [{"role": "assistant", "content": welcome_msg}] | |
| if user_message: | |
| history.append({"role": "user", "content": user_message}) | |
| return history, "" | |
| def handle_repo_bot_response(history: List[Dict[str, str]], repo_context_summary: str, repo_id: str) -> List[Dict[str, str]]: | |
| """Generate bot response for repo-specific questions using comprehensive context and vector search.""" | |
| if not history or history[-1]["role"] != "user" or not repo_context_summary.strip(): | |
| return history | |
| user_message = history[-1]["content"] | |
| # Use vector search to find relevant chunks | |
| relevant_chunks = vector_store.search(user_message, top_k=3) | |
| # Build enhanced context using vector search results | |
| vector_context = "" | |
| if relevant_chunks: | |
| vector_context = "\n\n=== MOST RELEVANT CODE SECTIONS ===\n" | |
| for i, (chunk, similarity, metadata) in enumerate(relevant_chunks): | |
| chunk_id = metadata.get('chunk_index', i) | |
| start_line = metadata.get('start_line', 'unknown') | |
| end_line = metadata.get('end_line', 'unknown') | |
| vector_context += f"\n--- Relevant Section {i+1} (similarity: {similarity:.3f}, lines {start_line}-{end_line}) ---\n{chunk}\n" | |
| # Create a specialized prompt using both comprehensive context and vector search results | |
| repo_system_prompt = f"""You are an expert assistant for the Hugging Face repository '{repo_id}'. | |
| You have comprehensive knowledge about this repository based on detailed analysis of all its files and components. | |
| Use the following comprehensive analysis to answer user questions accurately and helpfully: | |
| {repo_context_summary} | |
| {vector_context} | |
| Instructions: | |
| - Answer questions clearly and conversationally about this specific repository | |
| - Reference specific components, functions, or features when relevant | |
| - Provide practical guidance on installation, usage, and implementation | |
| - If asked about code details, refer to the analysis above and the relevant code sections | |
| - Use the most relevant code sections to provide specific examples and implementation details | |
| - Be helpful and informative while staying focused on this repository | |
| - If something isn't covered in the analysis, acknowledge the limitation | |
| Answer the user's question based on your comprehensive knowledge of this repository.""" | |
| try: | |
| from openai import OpenAI | |
| client = OpenAI(api_key=os.getenv("modal_api")) | |
| client.base_url = os.getenv("base_url") | |
| response = client.chat.completions.create( | |
| model="Orion-zhen/Qwen2.5-Coder-7B-Instruct-AWQ", | |
| messages=[ | |
| {"role": "system", "content": repo_system_prompt}, | |
| {"role": "user", "content": user_message} | |
| ], | |
| max_tokens=1024, | |
| temperature=0.7 | |
| ) | |
| bot_response = response.choices[0].message.content | |
| history.append({"role": "assistant", "content": bot_response}) | |
| except Exception as e: | |
| logger.error(f"Error generating repo bot response: {e}") | |
| error_response = f"I apologize, but I encountered an error while processing your question: {e}" | |
| history.append({"role": "assistant", "content": error_response}) | |
| return history | |
| def get_huggingface_url(repo_id: str) -> str: | |
| """Generate the Hugging Face Spaces URL for a repository.""" | |
| if not repo_id.strip(): | |
| return "" | |
| return f"https://huggingface.co/spaces/{repo_id}" | |
| def generate_repo_link_html(repo_id: str) -> str: | |
| """Generate HTML with clickable link for the repository.""" | |
| if not repo_id or not repo_id.strip(): | |
| return "" | |
| clean_repo_id = str(repo_id).strip() | |
| hf_url = f"https://huggingface.co/spaces/{clean_repo_id}" | |
| html_link = f''' | |
| <div style="margin: 10px 0; padding: 15px; background: rgba(255, 255, 255, 0.1); border-radius: 12px; backdrop-filter: blur(10px); text-align: center;"> | |
| <a href="{hf_url}" target="_blank" style="display: inline-block; padding: 12px 24px; background: linear-gradient(45deg, #667eea, #764ba2); color: white; text-decoration: none; border-radius: 8px; font-weight: 600; font-size: 16px; transition: all 0.3s ease; box-shadow: 0 4px 12px rgba(0,0,0,0.2);"> | |
| π Visit {clean_repo_id} on Hugging Face | |
| </a> | |
| </div> | |
| ''' | |
| return html_link | |
| def handle_load_repository_with_vectorization(repo_id: str) -> Tuple[str, str, gr.HTML]: | |
| """Load repository and create both context summary and vector embeddings.""" | |
| if not repo_id.strip(): | |
| return "Status: Please enter a repository ID.", "", gr.update(value="", visible=False) | |
| try: | |
| logger.info(f"Loading repository with vectorization: {repo_id}") | |
| # Download and process the repository (existing logic) | |
| try: | |
| download_filtered_space_files(repo_id, local_dir="repo_files", file_extensions=['.py', '.md', '.txt']) | |
| combined_text_path = combine_repo_files_for_llm() | |
| except Exception as e: | |
| logger.error(f"Error downloading repository {repo_id}: {e}") | |
| error_status = f"β Error downloading repository: {e}" | |
| return error_status, "", gr.update(value="", visible=False) | |
| # Read the combined content | |
| with open(combined_text_path, "r", encoding="utf-8") as f: | |
| repo_content = f.read() | |
| # Create vectorized representation | |
| vectorization_success = vectorize_repository_content(repo_content, repo_id) | |
| # Get the original context summary | |
| from analyzer import create_repo_context_summary | |
| context_summary = create_repo_context_summary(repo_content, repo_id) | |
| # Update status message | |
| if vectorization_success: | |
| status = f"β Repository '{repo_id}' loaded successfully!\nπ Files processed and ready for exploration.\nπ Vector embeddings created for semantic search.\nπ¬ You can now ask questions about this repository." | |
| else: | |
| status = f"β Repository '{repo_id}' loaded successfully!\nπ Files processed and ready for exploration.\nβ οΈ Vectorization failed - using text-only analysis.\nπ¬ You can now ask questions about this repository." | |
| # Generate the HTML link for the repository | |
| repo_link_html = generate_repo_link_html(repo_id) | |
| logger.info(f"Repository {repo_id} loaded and processed successfully") | |
| return status, context_summary, gr.update(value=repo_link_html, visible=True) | |
| except Exception as e: | |
| logger.error(f"Error loading repository {repo_id}: {e}") | |
| error_status = f"β Error loading repository: {e}" | |
| return error_status, "", gr.update(value="", visible=False) | |
| def initialize_repo_chatbot(repo_status: str, repo_id: str, repo_context_summary: str) -> List[Dict[str, str]]: | |
| """Initialize the repository chatbot with a welcome message after successful repo loading.""" | |
| # Only initialize if repository was loaded successfully | |
| if repo_context_summary.strip() and "successfully" in repo_status.lower(): | |
| # Check if vectorization was successful | |
| vectorization_status = "π **Enhanced with vector search** for finding relevant code sections" if "Vector embeddings created" in repo_status else "π **Text-based analysis** (vector search unavailable)" | |
| welcome_msg = f"π Welcome! I've successfully analyzed the **{repo_id}** repository.\n\nπ§ **I now have comprehensive knowledge of:**\nβ’ All files and code structure\nβ’ Key features and capabilities\nβ’ Installation and usage instructions\nβ’ Architecture and implementation details\nβ’ Dependencies and requirements\n\n{vectorization_status}\n\nπ¬ **Ask me anything about this repository!** \nFor example:\nβ’ \"What does this repository do?\"\nβ’ \"How do I install and use it?\"\nβ’ \"What are the main components?\"\nβ’ \"Show me usage examples\"\n\nWhat would you like to know? π€" | |
| return [{"role": "assistant", "content": welcome_msg}] | |
| else: | |
| # Keep chatbot empty if loading failed | |
| return [] | |
| def setup_repo_explorer_events(components: Dict[str, gr.components.Component], states: Dict[str, gr.State]): | |
| """Setup event handlers for the repo explorer components.""" | |
| # Load repository event with vectorization | |
| components["load_repo_btn"].click( | |
| fn=handle_load_repository_with_vectorization, | |
| inputs=[components["repo_explorer_input"]], | |
| outputs=[components["repo_status_display"], states["repo_context_summary"], components["visit_hf_link"]] | |
| ).then( | |
| fn=lambda repo_id: repo_id, | |
| inputs=[components["repo_explorer_input"]], | |
| outputs=[states["current_repo_id"]] | |
| ).then( | |
| fn=initialize_repo_chatbot, | |
| inputs=[components["repo_status_display"], states["current_repo_id"], states["repo_context_summary"]], | |
| outputs=[components["repo_chatbot"]] | |
| ) | |
| # Chat message submission events | |
| components["repo_msg_input"].submit( | |
| fn=handle_repo_user_message, | |
| inputs=[components["repo_msg_input"], components["repo_chatbot"], states["repo_context_summary"], states["current_repo_id"]], | |
| outputs=[components["repo_chatbot"], components["repo_msg_input"]] | |
| ).then( | |
| fn=handle_repo_bot_response, | |
| inputs=[components["repo_chatbot"], states["repo_context_summary"], states["current_repo_id"]], | |
| outputs=[components["repo_chatbot"]] | |
| ) | |
| components["repo_send_btn"].click( | |
| fn=handle_repo_user_message, | |
| inputs=[components["repo_msg_input"], components["repo_chatbot"], states["repo_context_summary"], states["current_repo_id"]], | |
| outputs=[components["repo_chatbot"], components["repo_msg_input"]] | |
| ).then( | |
| fn=handle_repo_bot_response, | |
| inputs=[components["repo_chatbot"], states["repo_context_summary"], states["current_repo_id"]], | |
| outputs=[components["repo_chatbot"]] | |
| ) |