Spaces:
Running
Running
| import re | |
| from typing import List, Dict | |
| from markitdown import MarkItDown | |
| from langchain.schema import Document | |
| from src.processor.text_utils import VietnameseTextProcessor | |
| class WordDocumentProcessor: | |
| """Process Word document content with semantic chunking using MarkItDown""" | |
| def __init__(self): | |
| self.text_processor = VietnameseTextProcessor() | |
| self.md_converter = MarkItDown() | |
| # Section patterns for Vietnamese content | |
| self.section_patterns = { | |
| "policy": r"(?:quy định|chính sách|policy)", | |
| "procedure": r"(?:hướng dẫn|cách|thao tác|bước)", | |
| "scope": r"(?:phạm vi|áp dụng|scope)", | |
| "fee": r"(?:phí|chiết khấu|fee|discount)", | |
| "timeout": r"(?:timeout|treo|đang xử lý|processing)", | |
| } | |
| def process_word_document(self, file_path: str) -> List[Document]: | |
| """Process Word document with semantic section chunking using MarkItDown""" | |
| try: | |
| # Convert document using MarkItDown | |
| result = self.md_converter.convert(file_path) | |
| content = result.text_content | |
| print(f"Document converted successfully. Content length: {len(content)}") | |
| print(f"First 500 characters:\n{content[:500]}...") | |
| except Exception as e: | |
| print(f"Error converting document with MarkItDown: {e}") | |
| return [] | |
| documents = [] | |
| # Extract semantic sections | |
| sections = self._extract_semantic_sections(content) | |
| for section in sections: | |
| processed_document = self._process_section(section, file_path) | |
| if processed_document: | |
| documents.append(processed_document) | |
| return documents | |
| def _extract_semantic_sections(self, content: str) -> List[Dict]: | |
| """Extract meaningful sections from Markdown content produced by MarkItDown""" | |
| sections = [] | |
| # Since MarkItDown produces proper Markdown, we can use more sophisticated parsing | |
| # Split by headers (## or # followed by text) | |
| header_pattern = r"\n(?=#{1,6}\s+)" | |
| major_sections = re.split(header_pattern, content) | |
| for section_text in major_sections: | |
| if len(section_text.strip()) < 30: # Skip very short sections | |
| continue | |
| section_info = self._analyze_markdown_section(section_text) | |
| if section_info: | |
| sections.append(section_info) | |
| # If no clear sections found, create chunks from paragraphs | |
| if not sections: | |
| sections = self._fallback_paragraph_chunking(content) | |
| return sections | |
| def _analyze_markdown_section(self, section_text: str) -> Dict: | |
| """Analyze Markdown section to determine type and extract content""" | |
| lines = section_text.strip().split("\n") | |
| if not lines: | |
| return None | |
| # Find the header line (starts with #) | |
| header = "" | |
| body_start_idx = 0 | |
| for i, line in enumerate(lines): | |
| if line.strip().startswith("#"): | |
| header = line.strip() | |
| body_start_idx = i + 1 | |
| break | |
| # If no header found, use first line as header | |
| if not header and lines: | |
| header = lines[0].strip() | |
| body_start_idx = 1 | |
| # Get body content (rest of lines) | |
| body_lines = lines[body_start_idx:] if body_start_idx < len(lines) else [] | |
| body = "\n".join(body_lines).strip() | |
| # Skip if body is too short or just references | |
| if len(body) < 20 or ( | |
| len(body.split()) < 10 | |
| and any(word in body.lower() for word in ["csv", "file", "document"]) | |
| ): | |
| return None | |
| # Determine section type | |
| section_type = self._determine_section_type(header + " " + body) | |
| return { | |
| "header": header, | |
| "body": body, | |
| "type": section_type, | |
| "full_text": section_text.strip(), | |
| } | |
| def _fallback_paragraph_chunking(self, content: str) -> List[Dict]: | |
| """Fallback method to chunk content by paragraphs when no clear sections are found""" | |
| paragraphs = [ | |
| p.strip() | |
| for p in content.split("\n\n") | |
| if p.strip() and len(p.strip()) > 50 | |
| ] | |
| sections = [] | |
| for i, paragraph in enumerate(paragraphs): | |
| # Use first line or sentence as header | |
| sentences = paragraph.split(".") | |
| header = sentences[0] if sentences else f"Section {i+1}" | |
| section_type = self._determine_section_type(paragraph) | |
| sections.append( | |
| { | |
| "header": header, | |
| "body": paragraph, | |
| "type": section_type, | |
| "full_text": paragraph, | |
| } | |
| ) | |
| return sections | |
| def _determine_section_type(self, text: str) -> str: | |
| """Determine section type based on content""" | |
| text_lower = text.lower() | |
| for section_type, pattern in self.section_patterns.items(): | |
| if re.search(pattern, text_lower): | |
| return section_type | |
| # Default to general content | |
| return "general" | |
| def _process_section(self, section: Dict, source_file: str) -> Document: | |
| """Process individual section into chunk, preserving Markdown structure""" | |
| header = section["header"] | |
| body = section["body"] | |
| section_type = section["type"] | |
| # Clean and format content | |
| if header.startswith("#"): | |
| # For Markdown headers, clean them up but preserve structure | |
| clean_header = re.sub(r"^#+\s*", "", header).strip() | |
| # Keep the structure but make it cleaner for content | |
| content = f"{clean_header}\n\n{body}".strip() | |
| else: | |
| content = f"{header}\n\n{body}".strip() | |
| # Remove excessive whitespace and normalize | |
| content = re.sub(r"\n\s*\n\s*\n", "\n\n", content) # Remove triple+ newlines | |
| content = re.sub(r"[ \t]+", " ", content) # Normalize spaces and tabs | |
| return Document( | |
| page_content=content, | |
| metadata={ | |
| "doc_type": section_type, | |
| "section_header": clean_header if header.startswith("#") else header, | |
| "category": "word_document", | |
| "has_markdown": ( | |
| "yes" | |
| if any(marker in content for marker in ["#", "*", "|", "```"]) | |
| else "no" | |
| ), | |
| "content_length": len(content), | |
| "source_file": source_file, | |
| }, | |
| ) | |