Spaces:
Running
Running
| import os | |
| from typing import List | |
| import PyPDF2 | |
| class TextFileLoader: | |
| def __init__(self, path: str, encoding: str = "utf-8"): | |
| self.documents = [] | |
| self.path = path | |
| self.encoding = encoding | |
| def load(self): | |
| if os.path.isdir(self.path): | |
| self.load_directory() | |
| elif os.path.isfile(self.path) and self.path.endswith(".txt"): | |
| self.load_file() | |
| else: | |
| raise ValueError( | |
| "Provided path is neither a valid directory nor a .txt file." | |
| ) | |
| def load_file(self): | |
| with open(self.path, "r", encoding=self.encoding) as f: | |
| self.documents.append(f.read()) | |
| def load_directory(self): | |
| for root, _, files in os.walk(self.path): | |
| for file in files: | |
| if file.endswith(".txt"): | |
| with open( | |
| os.path.join(root, file), "r", encoding=self.encoding | |
| ) as f: | |
| self.documents.append(f.read()) | |
| def load_documents(self): | |
| self.load() | |
| return self.documents | |
| class CharacterTextSplitter: | |
| def __init__( | |
| self, | |
| chunk_size: int = 1000, | |
| chunk_overlap: int = 200, | |
| ): | |
| assert ( | |
| chunk_size > chunk_overlap | |
| ), "Chunk size must be greater than chunk overlap" | |
| self.chunk_size = chunk_size | |
| self.chunk_overlap = chunk_overlap | |
| def split(self, text: str) -> List[str]: | |
| paragraphs = text.split('\n\n') | |
| chunks = [] | |
| current_chunk = "" | |
| for paragraph in paragraphs: | |
| if len(current_chunk) + len(paragraph) > self.chunk_size: | |
| if current_chunk: | |
| chunks.append(current_chunk.strip()) | |
| if len(paragraph) > self.chunk_size: | |
| words = paragraph.split() | |
| current_chunk = "" | |
| for word in words: | |
| if len(current_chunk) + len(word) + 1 > self.chunk_size: | |
| chunks.append(current_chunk.strip()) | |
| current_chunk = word | |
| else: | |
| current_chunk += " " + word if current_chunk else word | |
| else: | |
| current_chunk = paragraph | |
| else: | |
| if current_chunk: | |
| current_chunk += "\n\n" + paragraph | |
| else: | |
| current_chunk = paragraph | |
| if current_chunk: | |
| chunks.append(current_chunk.strip()) | |
| final_chunks = [] | |
| for chunk in chunks: | |
| if len(chunk) > 8000: | |
| words = chunk.split() | |
| current = "" | |
| for word in words: | |
| if len(current) + len(word) + 1 > 8000: | |
| final_chunks.append(current.strip()) | |
| current = word | |
| else: | |
| current += " " + word if current else word | |
| if current: | |
| final_chunks.append(current.strip()) | |
| else: | |
| final_chunks.append(chunk) | |
| return final_chunks | |
| def split_texts(self, texts: List[str]) -> List[str]: | |
| chunks = [] | |
| for text in texts: | |
| chunks.extend(self.split(text)) | |
| return chunks | |
| class PDFLoader: | |
| def __init__(self, path: str): | |
| self.documents = [] | |
| self.path = path | |
| print(f"PDFLoader initialized with path: {self.path}") | |
| def load(self): | |
| print(f"Loading PDF from path: {self.path}") | |
| print(f"Path exists: {os.path.exists(self.path)}") | |
| print(f"Is file: {os.path.isfile(self.path)}") | |
| print(f"Is directory: {os.path.isdir(self.path)}") | |
| print(f"File permissions: {oct(os.stat(self.path).st_mode)[-3:]}") | |
| try: | |
| # Try to open the file first to verify access | |
| with open(self.path, 'rb') as test_file: | |
| pass | |
| # If we can open it, proceed with loading | |
| self.load_file() | |
| except IOError as e: | |
| print(f"IOError while accessing file: {str(e)}") | |
| raise ValueError(f"Cannot access file at '{self.path}': {str(e)}") | |
| except Exception as e: | |
| print(f"Unexpected error while processing file: {str(e)}") | |
| raise ValueError(f"Error processing file at '{self.path}': {str(e)}") | |
| def load_file(self): | |
| with open(self.path, 'rb') as file: | |
| # Create PDF reader object | |
| pdf_reader = PyPDF2.PdfReader(file) | |
| print(f"PDF loaded successfully. Number of pages: {len(pdf_reader.pages)}") | |
| # Extract text from each page | |
| text = "" | |
| for i, page in enumerate(pdf_reader.pages): | |
| page_text = page.extract_text() | |
| if not page_text.strip(): | |
| print(f"Warning: Page {i+1} appears to be empty or unreadable") | |
| text += page_text + "\n" | |
| print(f"Processed page {i+1}, extracted {len(page_text)} characters") | |
| if not text.strip(): | |
| print("Warning: No text was extracted from the PDF") | |
| else: | |
| print(f"Successfully extracted {len(text)} characters of text") | |
| self.documents.append(text) | |
| def load_directory(self): | |
| for root, _, files in os.walk(self.path): | |
| for file in files: | |
| if file.lower().endswith('.pdf'): | |
| file_path = os.path.join(root, file) | |
| with open(file_path, 'rb') as f: | |
| pdf_reader = PyPDF2.PdfReader(f) | |
| # Extract text from each page | |
| text = "" | |
| for page in pdf_reader.pages: | |
| text += page.extract_text() + "\n" | |
| self.documents.append(text) | |
| def load_documents(self): | |
| self.load() | |
| return self.documents | |
| if __name__ == "__main__": | |
| loader = TextFileLoader("data/KingLear.txt") | |
| loader.load() | |
| splitter = CharacterTextSplitter() | |
| chunks = splitter.split_texts(loader.documents) | |
| print(len(chunks)) | |
| print(chunks[0]) | |
| print("--------") | |
| print(chunks[1]) | |
| print("--------") | |
| print(chunks[-2]) | |
| print("--------") | |
| print(chunks[-1]) | |