File size: 8,237 Bytes
fc80207 3de23a5 fc80207 3de23a5 fc80207 62309aa 9cb9fcf 62309aa fc80207 62309aa fc80207 62309aa fc80207 62309aa fc80207 62309aa fc80207 62309aa fc80207 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 |
"""
Simple RAG (Retrieval-Augmented Generation) System using LangChain
"""
import os
import tempfile
from typing import List, Dict, Any, Optional
from pathlib import Path
import uuid
try:
from langchain_community.document_loaders import (
PyPDFLoader,
TextLoader,
UnstructuredWordDocumentLoader
)
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import FAISS
from langchain.schema import Document
LANGCHAIN_AVAILABLE = True
except ImportError:
print("LangChain not installed. Install with: pip install langchain langchain-community langchain-huggingface pypdf python-docx faiss-cpu sentence-transformers")
LANGCHAIN_AVAILABLE = False
# Fallback Document class for type hints
class Document:
def __init__(self, page_content: str, metadata: dict = None):
self.page_content = page_content
self.metadata = metadata or {}
class SimpleRAGSystem:
def __init__(self):
"""Initialize the RAG system with embeddings and vector store."""
if not LANGCHAIN_AVAILABLE:
print("LangChain not available. RAG functionality disabled.")
self.embeddings = None
self.vector_store = None
self.documents_metadata = {}
self.text_splitter = None
return
# Use a lightweight embedding model
self.embeddings = None
self.vector_store = None
self.documents_metadata = {}
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len
)
# Initialize embeddings
try:
self.embeddings = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2",
model_kwargs={'device': 'cpu'}
)
except Exception as e:
print(f"Failed to initialize embeddings: {e}")
self.embeddings = None
def _load_document(self, file_path: str, file_type: str) -> List[Document]:
"""Load a document based on its type."""
if not LANGCHAIN_AVAILABLE:
return [Document(
page_content="LangChain not available",
metadata={"source": file_path, "error": True}
)]
try:
if file_type == 'application/pdf' or file_path.endswith('.pdf'):
loader = PyPDFLoader(file_path)
elif file_type == 'text/plain' or file_path.endswith('.txt'):
loader = TextLoader(file_path, encoding='utf-8')
elif file_type == 'application/vnd.openxmlformats-officedocument.wordprocessingml.document' or file_path.endswith('.docx'):
loader = UnstructuredWordDocumentLoader(file_path)
elif file_path.endswith('.md'):
loader = TextLoader(file_path, encoding='utf-8')
else:
# Fallback to text loader
loader = TextLoader(file_path, encoding='utf-8')
return loader.load()
except Exception as e:
print(f"Error loading document {file_path}: {e}")
# Return empty document with error info
return [Document(
page_content=f"Error loading document: {str(e)}",
metadata={"source": file_path, "error": True}
)]
def add_document(self, file_content: bytes, filename: str, file_type: str) -> Dict[str, Any]:
"""Add a document to the RAG system."""
if not LANGCHAIN_AVAILABLE:
return {"success": False, "error": "LangChain not available"}
if not self.embeddings:
return {"success": False, "error": "Embeddings not initialized"}
try:
# Create temporary file
doc_id = str(uuid.uuid4())
with tempfile.NamedTemporaryFile(delete=False, suffix=Path(filename).suffix) as tmp_file:
tmp_file.write(file_content)
tmp_path = tmp_file.name
# Load and process document
documents = self._load_document(tmp_path, file_type)
# Split documents into chunks
texts = self.text_splitter.split_documents(documents)
# Add metadata
for text in texts:
text.metadata.update({
"doc_id": doc_id,
"filename": filename,
"file_type": file_type
})
# Create or update vector store
if self.vector_store is None:
self.vector_store = FAISS.from_documents(texts, self.embeddings)
else:
self.vector_store.add_documents(texts)
# Store document metadata
self.documents_metadata[doc_id] = {
"filename": filename,
"file_type": file_type,
"chunks": len(texts),
"status": "processed"
}
# Clean up temporary file
os.unlink(tmp_path)
return {
"success": True,
"doc_id": doc_id,
"chunks": len(texts),
"message": f"Document '{filename}' processed successfully"
}
except Exception as e:
print(f"Error processing document {filename}: {e}")
return {"success": False, "error": str(e)}
def remove_document(self, doc_id: str) -> Dict[str, Any]:
"""Remove a document from the RAG system."""
try:
if doc_id in self.documents_metadata:
# Note: FAISS doesn't support removing specific documents easily
# In a production system, you'd rebuild the vector store
del self.documents_metadata[doc_id]
return {"success": True, "message": "Document removed"}
else:
return {"success": False, "error": "Document not found"}
except Exception as e:
return {"success": False, "error": str(e)}
def search_similar(self, query: str, k: int = 3) -> List[Dict[str, Any]]:
"""Search for similar documents."""
if not LANGCHAIN_AVAILABLE:
return []
if not self.vector_store:
return []
try:
docs = self.vector_store.similarity_search(query, k=k)
results = []
for doc in docs:
results.append({
"content": doc.page_content,
"metadata": doc.metadata,
"filename": doc.metadata.get("filename", "Unknown")
})
return results
except Exception as e:
print(f"Error searching documents: {e}")
return []
def get_context_for_query(self, query: str, max_chunks: int = 3) -> str:
"""Get relevant context for a query."""
if not LANGCHAIN_AVAILABLE:
return ""
if not self.vector_store:
return ""
try:
similar_docs = self.search_similar(query, k=max_chunks)
context_parts = []
for doc in similar_docs:
context_parts.append(f"From '{doc['filename']}':\n{doc['content']}")
return "\n\n---\n\n".join(context_parts)
except Exception as e:
print(f"Error getting context: {e}")
return ""
def get_documents_info(self) -> Dict[str, Any]:
"""Get information about stored documents."""
return {
"total_documents": len(self.documents_metadata),
"documents": self.documents_metadata,
"vector_store_ready": self.vector_store is not None
}
# Global RAG system instance
rag_system = SimpleRAGSystem()
def get_rag_system() -> SimpleRAGSystem:
"""Get the global RAG system instance."""
return rag_system
|