Spaces:
Runtime error
Runtime error
| """ | |
| Copyright (c) 2025 Bytedance Ltd. and/or its affiliates | |
| SPDX-License-Identifier: MIT | |
| """ | |
| import copy | |
| import io | |
| import json | |
| import os | |
| import re | |
| from dataclasses import dataclass | |
| from typing import List, Tuple | |
| import cv2 | |
| import numpy as np | |
| import pymupdf | |
| from PIL import Image | |
| from utils.markdown_utils import MarkdownConverter | |
| def save_figure_to_local(pil_crop, save_dir, image_name, reading_order): | |
| """Save cropped figure to local file system | |
| Args: | |
| pil_crop: PIL Image object of the cropped figure | |
| save_dir: Base directory to save results | |
| image_name: Name of the source image/document | |
| reading_order: Reading order of the figure in the document | |
| Returns: | |
| str: Filename of the saved figure | |
| """ | |
| try: | |
| # Create figures directory if it doesn't exist | |
| figures_dir = os.path.join(save_dir, "markdown", "figures") | |
| # os.makedirs(figures_dir, exist_ok=True) | |
| # Generate figure filename | |
| figure_filename = f"{image_name}_figure_{reading_order:03d}.png" | |
| figure_path = os.path.join(figures_dir, figure_filename) | |
| # Save the figure | |
| pil_crop.save(figure_path, format="PNG", quality=95) | |
| # print(f"Saved figure: {figure_filename}") | |
| return figure_filename | |
| except Exception as e: | |
| print(f"Error saving figure: {str(e)}") | |
| # Return a fallback filename | |
| return f"{image_name}_figure_{reading_order:03d}_error.png" | |
| def convert_pdf_to_images(pdf_path, target_size=896): | |
| """Convert PDF pages to images | |
| Args: | |
| pdf_path: Path to PDF file | |
| target_size: Target size for the longest dimension | |
| Returns: | |
| List of PIL Images | |
| """ | |
| images = [] | |
| try: | |
| doc = pymupdf.open(pdf_path) | |
| for page_num in range(len(doc)): | |
| page = doc[page_num] | |
| # Calculate scale to make longest dimension equal to target_size | |
| rect = page.rect | |
| scale = target_size / max(rect.width, rect.height) | |
| # Render page as image | |
| mat = pymupdf.Matrix(scale, scale) | |
| pix = page.get_pixmap(matrix=mat) | |
| # Convert to PIL Image | |
| img_data = pix.tobytes("png") | |
| pil_image = Image.open(io.BytesIO(img_data)) | |
| images.append(pil_image) | |
| doc.close() | |
| print(f"Successfully converted {len(images)} pages from PDF") | |
| return images | |
| except Exception as e: | |
| print(f"Error converting PDF to images: {str(e)}") | |
| return [] | |
| def is_pdf_file(file_path): | |
| """Check if file is a PDF""" | |
| return file_path.lower().endswith(".pdf") | |
| def save_combined_pdf_results(all_page_results, pdf_path, save_dir): | |
| """Save combined results for multi-page PDF with both JSON and Markdown | |
| Args: | |
| all_page_results: List of results for all pages | |
| pdf_path: Path to original PDF file | |
| save_dir: Directory to save results | |
| Returns: | |
| Path to saved combined JSON file | |
| """ | |
| # Create output filename based on PDF name | |
| base_name = os.path.splitext(os.path.basename(pdf_path))[0] | |
| # Prepare combined results | |
| combined_results = {"source_file": pdf_path, "total_pages": len(all_page_results), "pages": all_page_results} | |
| # Save combined JSON results | |
| json_filename = f"{base_name}.json" | |
| json_path = os.path.join(save_dir, "recognition_json", json_filename) | |
| os.makedirs(os.path.dirname(json_path), exist_ok=True) | |
| with open(json_path, "w", encoding="utf-8") as f: | |
| json.dump(combined_results, f, indent=2, ensure_ascii=False) | |
| # Generate and save combined markdown | |
| try: | |
| markdown_converter = MarkdownConverter() | |
| # Combine all page results into a single list for markdown conversion | |
| all_elements = [] | |
| for page_data in all_page_results: | |
| page_elements = page_data.get("elements", []) | |
| if page_elements: | |
| # Add page separator if not the first page | |
| if all_elements: | |
| all_elements.append( | |
| {"label": "page_separator", "text": f"\n\n---\n\n", "reading_order": len(all_elements)} | |
| ) | |
| all_elements.extend(page_elements) | |
| # Generate markdown content | |
| markdown_content = markdown_converter.convert(all_elements) | |
| # Save markdown file | |
| markdown_filename = f"{base_name}.md" | |
| markdown_path = os.path.join(save_dir, "markdown", markdown_filename) | |
| os.makedirs(os.path.dirname(markdown_path), exist_ok=True) | |
| with open(markdown_path, "w", encoding="utf-8") as f: | |
| f.write(markdown_content) | |
| # print(f"Combined markdown saved to: {markdown_path}") | |
| except ImportError: | |
| print("MarkdownConverter not available, skipping markdown generation") | |
| except Exception as e: | |
| print(f"Error generating markdown: {e}") | |
| # print(f"Combined JSON results saved to: {json_path}") | |
| return json_path | |
| def check_coord_valid(x1, y1, x2, y2, image_size=None, abs_coord=True): | |
| # print(f"check_coord_valid: {x1}, {y1}, {x2}, {y2}, {image_size}, {abs_coord}") | |
| if x2 <= x1 or y2 <= y1: | |
| return False, f"[{x1}, {y1}, {x2}, {y2}]" | |
| if x1 < 0 or y1 < 0: | |
| return False, f"[{x1}, {y1}, {x2}, {y2}]" | |
| if not abs_coord: | |
| if x2 > 1 or y2 > 1: | |
| return False, f"[{x1}, {y1}, {x2}, {y2}]" | |
| elif image_size is not None: # has image size | |
| if x2 > image_size[0] or y2 > image_size[1]: | |
| return False, f"[{x1}, {y1}, {x2}, {y2}]" | |
| return True, None | |
| def adjust_box_edges(image, boxes: List[List[float]], max_pixels=15, threshold=0.2): | |
| """ | |
| Image: cv2.image object, or Path | |
| Input: boxes: list of boxes [[x1, y1, x2, y2]]. Using absolute coordinates. | |
| """ | |
| if isinstance(image, str): | |
| image = cv2.imread(image) | |
| img_h, img_w = image.shape[:2] | |
| new_boxes = [] | |
| for box in boxes: | |
| best_box = copy.deepcopy(box) | |
| def check_edge(img, current_box, i, is_vertical): | |
| edge = current_box[i] | |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| _, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU) | |
| if is_vertical: | |
| line = binary[current_box[1] : current_box[3] + 1, edge] | |
| else: | |
| line = binary[edge, current_box[0] : current_box[2] + 1] | |
| transitions = np.abs(np.diff(line)) | |
| return np.sum(transitions) / len(transitions) | |
| # Only widen the box | |
| edges = [(0, -1, True), (2, 1, True), (1, -1, False), (3, 1, False)] | |
| current_box = copy.deepcopy(box) | |
| # make sure the box is within the image | |
| current_box[0] = min(max(current_box[0], 0), img_w - 1) | |
| current_box[1] = min(max(current_box[1], 0), img_h - 1) | |
| current_box[2] = min(max(current_box[2], 0), img_w - 1) | |
| current_box[3] = min(max(current_box[3], 0), img_h - 1) | |
| for i, direction, is_vertical in edges: | |
| best_score = check_edge(image, current_box, i, is_vertical) | |
| if best_score <= threshold: | |
| continue | |
| for step in range(max_pixels): | |
| current_box[i] += direction | |
| if i == 0 or i == 2: | |
| current_box[i] = min(max(current_box[i], 0), img_w - 1) | |
| else: | |
| current_box[i] = min(max(current_box[i], 0), img_h - 1) | |
| score = check_edge(image, current_box, i, is_vertical) | |
| if score < best_score: | |
| best_score = score | |
| best_box = copy.deepcopy(current_box) | |
| if score <= threshold: | |
| break | |
| new_boxes.append(best_box) | |
| return new_boxes | |
| def parse_layout_string(bbox_str): | |
| """Parse layout string using regular expressions""" | |
| pattern = r"\[(\d*\.?\d+),\s*(\d*\.?\d+),\s*(\d*\.?\d+),\s*(\d*\.?\d+)\]\s*(\w+)" | |
| matches = re.finditer(pattern, bbox_str) | |
| parsed_results = [] | |
| for match in matches: | |
| coords = [float(match.group(i)) for i in range(1, 5)] | |
| label = match.group(5).strip() | |
| parsed_results.append((coords, label)) | |
| return parsed_results | |
| class ImageDimensions: | |
| """Class to store image dimensions""" | |
| original_w: int | |
| original_h: int | |
| padded_w: int | |
| padded_h: int | |
| def map_to_original_coordinates(x1, y1, x2, y2, dims: ImageDimensions) -> Tuple[int, int, int, int]: | |
| """Map coordinates from padded image back to original image | |
| Args: | |
| x1, y1, x2, y2: Coordinates in padded image | |
| dims: Image dimensions object | |
| Returns: | |
| tuple: (x1, y1, x2, y2) coordinates in original image | |
| """ | |
| try: | |
| # Calculate padding offsets | |
| top = (dims.padded_h - dims.original_h) // 2 | |
| left = (dims.padded_w - dims.original_w) // 2 | |
| # Map back to original coordinates | |
| orig_x1 = max(0, x1 - left) | |
| orig_y1 = max(0, y1 - top) | |
| orig_x2 = min(dims.original_w, x2 - left) | |
| orig_y2 = min(dims.original_h, y2 - top) | |
| # Ensure we have a valid box (width and height > 0) | |
| if orig_x2 <= orig_x1: | |
| orig_x2 = min(orig_x1 + 1, dims.original_w) | |
| if orig_y2 <= orig_y1: | |
| orig_y2 = min(orig_y1 + 1, dims.original_h) | |
| return int(orig_x1), int(orig_y1), int(orig_x2), int(orig_y2) | |
| except Exception as e: | |
| print(f"map_to_original_coordinates error: {str(e)}") | |
| # Return safe coordinates | |
| return 0, 0, min(100, dims.original_w), min(100, dims.original_h) | |
| def map_to_relevant_coordinates(abs_coords, dims: ImageDimensions): | |
| """ | |
| From absolute coordinates to relevant coordinates | |
| e.g. [100, 100, 200, 200] -> [0.1, 0.2, 0.3, 0.4] | |
| """ | |
| try: | |
| x1, y1, x2, y2 = abs_coords | |
| return ( | |
| round(x1 / dims.original_w, 3), | |
| round(y1 / dims.original_h, 3), | |
| round(x2 / dims.original_w, 3), | |
| round(y2 / dims.original_h, 3), | |
| ) | |
| except Exception as e: | |
| print(f"map_to_relevant_coordinates error: {str(e)}") | |
| return 0.0, 0.0, 1.0, 1.0 # Return full image coordinates | |
| def process_coordinates(coords, padded_image, dims: ImageDimensions, previous_box=None): | |
| """Process and adjust coordinates | |
| Args: | |
| coords: Normalized coordinates [x1, y1, x2, y2] | |
| padded_image: Padded image | |
| dims: Image dimensions object | |
| previous_box: Previous box coordinates for overlap adjustment | |
| Returns: | |
| tuple: (x1, y1, x2, y2, orig_x1, orig_y1, orig_x2, orig_y2, new_previous_box) | |
| """ | |
| try: | |
| # Convert normalized coordinates to absolute coordinates | |
| x1, y1 = int(coords[0] * dims.padded_w), int(coords[1] * dims.padded_h) | |
| x2, y2 = int(coords[2] * dims.padded_w), int(coords[3] * dims.padded_h) | |
| # Ensure coordinates are within image bounds before adjustment | |
| x1 = max(0, min(x1, dims.padded_w - 1)) | |
| y1 = max(0, min(y1, dims.padded_h - 1)) | |
| x2 = max(0, min(x2, dims.padded_w)) | |
| y2 = max(0, min(y2, dims.padded_h)) | |
| # Ensure width and height are at least 1 pixel | |
| if x2 <= x1: | |
| x2 = min(x1 + 1, dims.padded_w) | |
| if y2 <= y1: | |
| y2 = min(y1 + 1, dims.padded_h) | |
| # Extend box boundaries | |
| new_boxes = adjust_box_edges(padded_image, [[x1, y1, x2, y2]]) | |
| x1, y1, x2, y2 = new_boxes[0] | |
| # Ensure coordinates are still within image bounds after adjustment | |
| x1 = max(0, min(x1, dims.padded_w - 1)) | |
| y1 = max(0, min(y1, dims.padded_h - 1)) | |
| x2 = max(0, min(x2, dims.padded_w)) | |
| y2 = max(0, min(y2, dims.padded_h)) | |
| # Ensure width and height are at least 1 pixel after adjustment | |
| if x2 <= x1: | |
| x2 = min(x1 + 1, dims.padded_w) | |
| if y2 <= y1: | |
| y2 = min(y1 + 1, dims.padded_h) | |
| # Check for overlap with previous box and adjust | |
| if previous_box is not None: | |
| prev_x1, prev_y1, prev_x2, prev_y2 = previous_box | |
| if (x1 < prev_x2 and x2 > prev_x1) and (y1 < prev_y2 and y2 > prev_y1): | |
| y1 = prev_y2 | |
| # Ensure y1 is still valid | |
| y1 = min(y1, dims.padded_h - 1) | |
| # Make sure y2 is still greater than y1 | |
| if y2 <= y1: | |
| y2 = min(y1 + 1, dims.padded_h) | |
| # Update previous box | |
| new_previous_box = [x1, y1, x2, y2] | |
| # Map to original coordinates | |
| orig_x1, orig_y1, orig_x2, orig_y2 = map_to_original_coordinates(x1, y1, x2, y2, dims) | |
| return x1, y1, x2, y2, orig_x1, orig_y1, orig_x2, orig_y2, new_previous_box | |
| except Exception as e: | |
| print(f"process_coordinates error: {str(e)}") | |
| # Return safe values | |
| orig_x1, orig_y1, orig_x2, orig_y2 = 0, 0, min(100, dims.original_w), min(100, dims.original_h) | |
| return 0, 0, 100, 100, orig_x1, orig_y1, orig_x2, orig_y2, [0, 0, 100, 100] | |
| def prepare_image(image) -> Tuple[np.ndarray, ImageDimensions]: | |
| """Load and prepare image with padding while maintaining aspect ratio | |
| Args: | |
| image: PIL image | |
| Returns: | |
| tuple: (padded_image, image_dimensions) | |
| """ | |
| try: | |
| # Convert PIL image to OpenCV format | |
| image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) | |
| original_h, original_w = image.shape[:2] | |
| # Calculate padding to make square image | |
| max_size = max(original_h, original_w) | |
| top = (max_size - original_h) // 2 | |
| bottom = max_size - original_h - top | |
| left = (max_size - original_w) // 2 | |
| right = max_size - original_w - left | |
| # Apply padding | |
| padded_image = cv2.copyMakeBorder(image, top, bottom, left, right, cv2.BORDER_CONSTANT, value=(0, 0, 0)) | |
| padded_h, padded_w = padded_image.shape[:2] | |
| dimensions = ImageDimensions(original_w=original_w, original_h=original_h, padded_w=padded_w, padded_h=padded_h) | |
| return padded_image, dimensions | |
| except Exception as e: | |
| print(f"prepare_image error: {str(e)}") | |
| # Create a minimal valid image and dimensions | |
| h, w = image.height, image.width | |
| dimensions = ImageDimensions(original_w=w, original_h=h, padded_w=w, padded_h=h) | |
| # Return a black image of the same size | |
| return np.zeros((h, w, 3), dtype=np.uint8), dimensions | |
| def setup_output_dirs(save_dir): | |
| """Create necessary output directories""" | |
| os.makedirs(save_dir, exist_ok=True) | |
| os.makedirs(os.path.join(save_dir, "markdown"), exist_ok=True) | |
| os.makedirs(os.path.join(save_dir, "recognition_json"), exist_ok=True) | |
| os.makedirs(os.path.join(save_dir, "markdown", "figures"), exist_ok=True) | |
| def save_outputs(recognition_results, image_path, save_dir): | |
| """Save JSON and markdown outputs""" | |
| basename = os.path.splitext(os.path.basename(image_path))[0] | |
| # Save JSON file | |
| json_path = os.path.join(save_dir, "recognition_json", f"{basename}.json") | |
| with open(json_path, "w", encoding="utf-8") as f: | |
| json.dump(recognition_results, f, ensure_ascii=False, indent=2) | |
| # Generate and save markdown file | |
| markdown_converter = MarkdownConverter() | |
| markdown_content = markdown_converter.convert(recognition_results) | |
| markdown_path = os.path.join(save_dir, "markdown", f"{basename}.md") | |
| with open(markdown_path, "w", encoding="utf-8") as f: | |
| f.write(markdown_content) | |
| return json_path | |
| def crop_margin(img: Image.Image) -> Image.Image: | |
| """Crop margins from image""" | |
| try: | |
| width, height = img.size | |
| if width == 0 or height == 0: | |
| print("Warning: Image has zero width or height") | |
| return img | |
| data = np.array(img.convert("L")) | |
| data = data.astype(np.uint8) | |
| max_val = data.max() | |
| min_val = data.min() | |
| if max_val == min_val: | |
| return img | |
| data = (data - min_val) / (max_val - min_val) * 255 | |
| gray = 255 * (data < 200).astype(np.uint8) | |
| coords = cv2.findNonZero(gray) # Find all non-zero points (text) | |
| if coords is None: | |
| return img | |
| a, b, w, h = cv2.boundingRect(coords) # Find minimum spanning bounding box | |
| # Ensure crop coordinates are within image bounds | |
| a = max(0, a) | |
| b = max(0, b) | |
| w = min(w, width - a) | |
| h = min(h, height - b) | |
| # Only crop if we have a valid region | |
| if w > 0 and h > 0: | |
| return img.crop((a, b, a + w, b + h)) | |
| return img | |
| except Exception as e: | |
| print(f"crop_margin error: {str(e)}") | |
| return img # Return original image on error | |