Spaces:
Paused
Paused
| from typing import Dict, List | |
| from langchain.agents import AgentExecutor, create_openai_functions_agent | |
| from langchain_community.document_loaders import PyMuPDFLoader, TextLoader, UnstructuredURLLoader, WebBaseLoader | |
| from langchain_community.vectorstores import Qdrant | |
| from langchain_core.messages import AIMessage, BaseMessage, HumanMessage | |
| from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder | |
| from langchain_core.language_models import BaseLanguageModel | |
| import os | |
| import functools | |
| import requests | |
| import tempfile | |
| from chainlit.types import AskFileResponse | |
| def process_file(uploaded_file: AskFileResponse): | |
| if uploaded_file.name.endswith(".pdf"): | |
| with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".pdf") as temp_file: | |
| temp_file_path = temp_file.name | |
| with open(temp_file_path, "wb") as f: | |
| f.write(uploaded_file.content) | |
| # Load PDF with PyMuPDFLoader | |
| loader = PyMuPDFLoader(temp_file_path) | |
| elif uploaded_file.name.endswith(".txt"): | |
| with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as temp_file: | |
| temp_file_path = temp_file.name | |
| with open(temp_file_path, "wb") as f: | |
| f.write(uploaded_file.content) | |
| # Load text file with TextLoader | |
| loader = TextLoader(temp_file_path) | |
| else: | |
| raise ValueError("Unsupported file format. Only PDF and TXT are supported.") | |
| return loader.load() | |
| def load_documents_from_url(url): | |
| try: | |
| # Check if it's a PDF | |
| if url.endswith(".pdf"): | |
| try: | |
| loader = PyMuPDFLoader(url) | |
| return loader.load() | |
| except Exception as e: | |
| print(f"Error loading PDF from {url}: {e}") | |
| return None | |
| # Fetch the content and check for video pages | |
| try: | |
| response = requests.head(url, timeout=10) # Timeout for fetching headers | |
| content_type = response.headers.get('Content-Type', '') | |
| except Exception as e: | |
| print(f"Error fetching headers from {url}: {e}") | |
| return None | |
| # Ignore video content (flagged for now) | |
| if 'video' in content_type: | |
| return None | |
| if 'youtube' in url: | |
| return None | |
| # Otherwise, treat it as an HTML page | |
| try: | |
| loader = UnstructuredURLLoader([url]) | |
| return loader.load() | |
| except Exception as e: | |
| print(f"Error loading HTML from {url}: {e}") | |
| return None | |
| except Exception as e: | |
| print(f"General error loading from {url}: {e}") | |
| return None | |
| def add_to_qdrant(documents, embeddings, qdrant_client, collection_name): | |
| Qdrant.from_documents( | |
| documents, | |
| embeddings, | |
| url=qdrant_client.url, | |
| prefer_grpc=True, | |
| collection_name=collection_name, | |
| ) | |
| def agent_node(state, agent, name): | |
| result = agent.invoke(state) | |
| return { | |
| "messages": [HumanMessage(content=result["messages"][-1].content, name=name)] | |
| } | |
| def create_team_agent(llm, tools, system_prompt, agent_name, team_members): | |
| return create_agent( | |
| llm, | |
| tools, | |
| f"{system_prompt}\nBelow are files currently in your directory:\n{{current_files}}", | |
| #team_members | |
| ) | |
| def create_agent_node(agent, name): | |
| return functools.partial(agent_node, agent=agent, name=name) | |
| def add_agent_to_graph(graph, agent_name, agent_node): | |
| graph.add_node(agent_name, agent_node) | |
| graph.add_edge(agent_name, "supervisor") | |
| def create_team_supervisor(llm, team_description, team_members): | |
| return create_team_supervisor( | |
| llm, | |
| f"You are a supervisor tasked with managing a conversation between the" | |
| f" following workers: {', '.join(team_members)}. {team_description}" | |
| f" When all workers are finished, you must respond with FINISH.", | |
| team_members | |
| ) | |
| def enter_chain(message: str, members: List[str]): | |
| results = { | |
| "messages": [HumanMessage(content=message)], | |
| "team_members": ", ".join(members), | |
| } | |
| return results | |
| def create_team_chain(graph, team_members): | |
| return ( | |
| functools.partial(enter_chain, members=team_members) | |
| | graph.compile() | |
| ) | |
| def create_agent( | |
| llm: BaseLanguageModel, | |
| tools: list, | |
| system_prompt: str, | |
| ) -> str: | |
| """Create a function-calling agent and add it to the graph.""" | |
| system_prompt += ("\nWork autonomously according to your specialty, using the tools available to you." | |
| " Do not ask for clarification." | |
| " Your other team members (and other teams) will collaborate with you with their own specialties." | |
| " You are chosen for a reason! You are one of the following team members: {{team_members}}.") | |
| prompt = ChatPromptTemplate.from_messages( | |
| [ | |
| ( | |
| "system", | |
| system_prompt, | |
| ), | |
| MessagesPlaceholder(variable_name="messages"), | |
| MessagesPlaceholder(variable_name="agent_scratchpad"), | |
| ] | |
| ) | |
| agent = create_openai_functions_agent(llm, tools, prompt) | |
| executor = AgentExecutor(agent=agent, tools=tools) | |
| return executor | |
| def format_docs(docs: List[Dict]) -> str: | |
| return "\n\n".join(f"Content: {doc.page_content}\nSource: {doc.metadata.get('source', 'Unknown')}" for doc in docs) | |