|
|
import os |
|
|
import random |
|
|
import uuid |
|
|
import json |
|
|
import time |
|
|
import asyncio |
|
|
from threading import Thread |
|
|
from typing import Iterable |
|
|
|
|
|
import gradio as gr |
|
|
import spaces |
|
|
import torch |
|
|
import numpy as np |
|
|
from PIL import Image |
|
|
import cv2 |
|
|
|
|
|
from transformers import ( |
|
|
Qwen2VLForConditionalGeneration, |
|
|
Qwen2_5_VLForConditionalGeneration, |
|
|
AutoModelForImageTextToText, |
|
|
AutoProcessor, |
|
|
TextIteratorStreamer, |
|
|
) |
|
|
from transformers.image_utils import load_image |
|
|
from gradio.themes import Soft |
|
|
from gradio.themes.utils import colors, fonts, sizes |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
colors.thistle = colors.Color( |
|
|
name="thistle", |
|
|
c50="#F9F5F9", |
|
|
c100="#F0E8F1", |
|
|
c200="#E7DBE8", |
|
|
c300="#DECEE0", |
|
|
c400="#D2BFD8", |
|
|
c500="#D8BFD8", |
|
|
c600="#B59CB7", |
|
|
c700="#927996", |
|
|
c800="#6F5675", |
|
|
c900="#4C3454", |
|
|
c950="#291233", |
|
|
) |
|
|
|
|
|
colors.red_gray = colors.Color( |
|
|
name="red_gray", |
|
|
c50="#f7eded", c100="#f5dcdc", c200="#efb4b4", c300="#e78f8f", |
|
|
c400="#d96a6a", c500="#c65353", c600="#b24444", c700="#8f3434", |
|
|
c800="#732d2d", c900="#5f2626", c950="#4d2020", |
|
|
) |
|
|
|
|
|
class ThistleTheme(Soft): |
|
|
def __init__( |
|
|
self, |
|
|
*, |
|
|
primary_hue: colors.Color | str = colors.gray, |
|
|
secondary_hue: colors.Color | str = colors.thistle, |
|
|
neutral_hue: colors.Color | str = colors.slate, |
|
|
text_size: sizes.Size | str = sizes.text_lg, |
|
|
font: fonts.Font | str | Iterable[fonts.Font | str] = ( |
|
|
fonts.GoogleFont("Outfit"), "Arial", "sans-serif", |
|
|
), |
|
|
font_mono: fonts.Font | str | Iterable[fonts.Font | str] = ( |
|
|
fonts.GoogleFont("IBM Plex Mono"), "ui-monospace", "monospace", |
|
|
), |
|
|
): |
|
|
super().__init__( |
|
|
primary_hue=primary_hue, |
|
|
secondary_hue=secondary_hue, |
|
|
neutral_hue=neutral_hue, |
|
|
text_size=text_size, |
|
|
font=font, |
|
|
font_mono=font_mono, |
|
|
) |
|
|
super().set( |
|
|
background_fill_primary="*primary_50", |
|
|
background_fill_primary_dark="*primary_900", |
|
|
body_background_fill="linear-gradient(135deg, *primary_200, *primary_100)", |
|
|
body_background_fill_dark="linear-gradient(135deg, *primary_900, *primary_800)", |
|
|
button_primary_text_color="black", |
|
|
button_primary_text_color_hover="white", |
|
|
button_primary_background_fill="linear-gradient(90deg, *secondary_400, *secondary_500)", |
|
|
button_primary_background_fill_hover="linear-gradient(90deg, *secondary_500, *secondary_600)", |
|
|
button_primary_background_fill_dark="linear-gradient(90deg, *secondary_600, *secondary_700)", |
|
|
button_primary_background_fill_hover_dark="linear-gradient(90deg, *secondary_500, *secondary_600)", |
|
|
button_secondary_text_color="black", |
|
|
button_secondary_text_color_hover="white", |
|
|
button_secondary_background_fill="linear-gradient(90deg, *primary_300, *primary_300)", |
|
|
button_secondary_background_fill_hover="linear-gradient(90deg, *primary_400, *primary_400)", |
|
|
button_secondary_background_fill_dark="linear-gradient(90deg, *primary_500, *primary_600)", |
|
|
button_secondary_background_fill_hover_dark="linear-gradient(90deg, *primary_500, *primary_500)", |
|
|
slider_color="*secondary_400", |
|
|
slider_color_dark="*secondary_600", |
|
|
block_title_text_weight="600", |
|
|
block_border_width="3px", |
|
|
block_shadow="*shadow_drop_lg", |
|
|
button_primary_shadow="*shadow_drop_lg", |
|
|
button_large_padding="11px", |
|
|
color_accent_soft="*primary_100", |
|
|
block_label_background_fill="*primary_200", |
|
|
) |
|
|
|
|
|
|
|
|
thistle_theme = ThistleTheme() |
|
|
|
|
|
css = """ |
|
|
#main-title h1 { |
|
|
font-size: 2.3em !important; |
|
|
} |
|
|
#output-title h2 { |
|
|
font-size: 2.1em !important; |
|
|
} |
|
|
:root { |
|
|
--color-grey-50: #f9fafb; |
|
|
--banner-background: var(--secondary-400); |
|
|
--banner-text-color: var(--primary-100); |
|
|
--banner-background-dark: var(--secondary-800); |
|
|
--banner-text-color-dark: var(--primary-100); |
|
|
--banner-chrome-height: calc(16px + 43px); |
|
|
--chat-chrome-height-wide-no-banner: 320px; |
|
|
--chat-chrome-height-narrow-no-banner: 450px; |
|
|
--chat-chrome-height-wide: calc(var(--chat-chrome-height-wide-no-banner) + var(--banner-chrome-height)); |
|
|
--chat-chrome-height-narrow: calc(var(--chat-chrome-height-narrow-no-banner) + var(--banner-chrome-height)); |
|
|
} |
|
|
.banner-message { background-color: var(--banner-background); padding: 5px; margin: 0; border-radius: 5px; border: none; } |
|
|
.banner-message-text { font-size: 13px; font-weight: bolder; color: var(--banner-text-color) !important; } |
|
|
body.dark .banner-message { background-color: var(--banner-background-dark) !important; } |
|
|
body.dark .gradio-container .contain .banner-message .banner-message-text { color: var(--banner-text-color-dark) !important; } |
|
|
.toast-body { background-color: var(--color-grey-50); } |
|
|
.html-container:has(.css-styles) { padding: 0; margin: 0; } |
|
|
.css-styles { height: 0; } |
|
|
.model-message { text-align: end; } |
|
|
.model-dropdown-container { display: flex; align-items: center; gap: 10px; padding: 0; } |
|
|
.user-input-container .multimodal-textbox{ border: none !important; } |
|
|
.control-button { height: 51px; } |
|
|
button.cancel { border: var(--button-border-width) solid var(--button-cancel-border-color); background: var(--button-cancel-background-fill); color: var(--button-cancel-text-color); box-shadow: var(--button-cancel-shadow); } |
|
|
button.cancel:hover, .cancel[disabled] { background: var(--button-cancel-background-fill-hover); color: var(--button-cancel-text-color-hover); } |
|
|
.opt-out-message { top: 8px; } |
|
|
.opt-out-message .html-container, .opt-out-checkbox label { font-size: 14px !important; padding: 0 !important; margin: 0 !important; color: var(--neutral-400) !important; } |
|
|
div.block.chatbot { height: calc(100svh - var(--chat-chrome-height-wide)) !important; max-height: 900px !important; } |
|
|
div.no-padding { padding: 0 !important; } |
|
|
@media (max-width: 1280px) { div.block.chatbot { height: calc(100svh - var(--chat-chrome-height-wide)) !important; } } |
|
|
@media (max-width: 1024px) { |
|
|
.responsive-row { flex-direction: column; } |
|
|
.model-message { text-align: start; font-size: 10px !important; } |
|
|
.model-dropdown-container { flex-direction: column; align-items: flex-start; } |
|
|
div.block.chatbot { height: calc(100svh - var(--chat-chrome-height-narrow)) !important; } |
|
|
} |
|
|
@media (max-width: 400px) { |
|
|
.responsive-row { flex-direction: column; } |
|
|
.model-message { text-align: start; font-size: 10px !important; } |
|
|
.model-dropdown-container { flex-direction: column; align-items: flex-start; } |
|
|
div.block.chatbot { max-height: 360px !important; } |
|
|
} |
|
|
@media (max-height: 932px) { .chatbot { max-height: 500px !important; } } |
|
|
@media (max-height: 1280px) { div.block.chatbot { max-height: 800px !important; } } |
|
|
""" |
|
|
|
|
|
|
|
|
MAX_MAX_NEW_TOKENS = 2048 |
|
|
DEFAULT_MAX_NEW_TOKENS = 1024 |
|
|
MAX_INPUT_TOKEN_LENGTH = int(os.getenv("MAX_INPUT_TOKEN_LENGTH", "4096")) |
|
|
|
|
|
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") |
|
|
|
|
|
print("CUDA_VISIBLE_DEVICES=", os.environ.get("CUDA_VISIBLE_DEVICES")) |
|
|
print("torch.__version__ =", torch.__version__) |
|
|
print("torch.version.cuda =", torch.version.cuda) |
|
|
print("cuda available:", torch.cuda.is_available()) |
|
|
print("cuda device count:", torch.cuda.device_count()) |
|
|
if torch.cuda.is_available(): |
|
|
print("current device:", torch.cuda.current_device()) |
|
|
print("device name:", torch.cuda.get_device_name(torch.cuda.current_device())) |
|
|
|
|
|
print("Using device:", device) |
|
|
|
|
|
|
|
|
|
|
|
MODEL_ID_V = "nanonets/Nanonets-OCR-s" |
|
|
processor_v = AutoProcessor.from_pretrained(MODEL_ID_V, trust_remote_code=True) |
|
|
model_v = Qwen2_5_VLForConditionalGeneration.from_pretrained( |
|
|
MODEL_ID_V, |
|
|
trust_remote_code=True, |
|
|
torch_dtype=torch.float16 |
|
|
).to(device).eval() |
|
|
|
|
|
|
|
|
MODEL_ID_X = "prithivMLmods/Qwen2-VL-OCR-2B-Instruct" |
|
|
processor_x = AutoProcessor.from_pretrained(MODEL_ID_X, trust_remote_code=True) |
|
|
model_x = Qwen2VLForConditionalGeneration.from_pretrained( |
|
|
MODEL_ID_X, |
|
|
trust_remote_code=True, |
|
|
torch_dtype=torch.float16 |
|
|
).to(device).eval() |
|
|
|
|
|
|
|
|
MODEL_ID_A = "CohereForAI/aya-vision-8b" |
|
|
processor_a = AutoProcessor.from_pretrained(MODEL_ID_A, trust_remote_code=True) |
|
|
model_a = AutoModelForImageTextToText.from_pretrained( |
|
|
MODEL_ID_A, |
|
|
trust_remote_code=True, |
|
|
torch_dtype=torch.float16 |
|
|
).to(device).eval() |
|
|
|
|
|
|
|
|
MODEL_ID_W = "allenai/olmOCR-7B-0725" |
|
|
processor_w = AutoProcessor.from_pretrained(MODEL_ID_W, trust_remote_code=True) |
|
|
model_w = Qwen2_5_VLForConditionalGeneration.from_pretrained( |
|
|
MODEL_ID_W, |
|
|
trust_remote_code=True, |
|
|
torch_dtype=torch.float16 |
|
|
).to(device).eval() |
|
|
|
|
|
|
|
|
MODEL_ID_M = "reducto/RolmOCR" |
|
|
processor_m = AutoProcessor.from_pretrained(MODEL_ID_M, trust_remote_code=True) |
|
|
model_m = Qwen2_5_VLForConditionalGeneration.from_pretrained( |
|
|
MODEL_ID_M, |
|
|
trust_remote_code=True, |
|
|
torch_dtype=torch.float16 |
|
|
).to(device).eval() |
|
|
|
|
|
def downsample_video(video_path): |
|
|
""" |
|
|
Downsamples the video to evenly spaced frames. |
|
|
Each frame is returned as a PIL image along with its timestamp. |
|
|
""" |
|
|
vidcap = cv2.VideoCapture(video_path) |
|
|
total_frames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
fps = vidcap.get(cv2.CAP_PROP_FPS) |
|
|
frames = [] |
|
|
frame_indices = np.linspace(0, total_frames - 1, 10, dtype=int) |
|
|
for i in frame_indices: |
|
|
vidcap.set(cv2.CAP_PROP_POS_FRAMES, i) |
|
|
success, image = vidcap.read() |
|
|
if success: |
|
|
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) |
|
|
pil_image = Image.fromarray(image) |
|
|
timestamp = round(i / fps, 2) |
|
|
frames.append((pil_image, timestamp)) |
|
|
vidcap.release() |
|
|
return frames |
|
|
|
|
|
@spaces.GPU |
|
|
def generate_image(model_name: str, text: str, image: Image.Image, |
|
|
max_new_tokens: int = 1024, |
|
|
temperature: float = 0.6, |
|
|
top_p: float = 0.9, |
|
|
top_k: int = 50, |
|
|
repetition_penalty: float = 1.2): |
|
|
""" |
|
|
Generates responses using the selected model for image input. |
|
|
Yields raw text and Markdown-formatted text. |
|
|
""" |
|
|
if model_name == "RolmOCR-7B": |
|
|
processor = processor_m |
|
|
model = model_m |
|
|
elif model_name == "Qwen2-VL-OCR-2B": |
|
|
processor = processor_x |
|
|
model = model_x |
|
|
elif model_name == "Nanonets-OCR-s": |
|
|
processor = processor_v |
|
|
model = model_v |
|
|
elif model_name == "Aya-Vision-8B": |
|
|
processor = processor_a |
|
|
model = model_a |
|
|
elif model_name == "olmOCR-7B-0725": |
|
|
processor = processor_w |
|
|
model = model_w |
|
|
else: |
|
|
yield "Invalid model selected.", "Invalid model selected." |
|
|
return |
|
|
|
|
|
if image is None: |
|
|
yield "Please upload an image.", "Please upload an image." |
|
|
return |
|
|
|
|
|
messages = [{ |
|
|
"role": "user", |
|
|
"content": [ |
|
|
{"type": "image"}, |
|
|
{"type": "text", "text": text}, |
|
|
] |
|
|
}] |
|
|
prompt_full = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) |
|
|
inputs = processor( |
|
|
text=[prompt_full], |
|
|
images=[image], |
|
|
return_tensors="pt", |
|
|
padding=True, |
|
|
truncation=True, |
|
|
max_length=MAX_INPUT_TOKEN_LENGTH |
|
|
).to(device) |
|
|
streamer = TextIteratorStreamer(processor, skip_prompt=True, skip_special_tokens=True) |
|
|
generation_kwargs = {**inputs, "streamer": streamer, "max_new_tokens": max_new_tokens} |
|
|
thread = Thread(target=model.generate, kwargs=generation_kwargs) |
|
|
thread.start() |
|
|
buffer = "" |
|
|
for new_text in streamer: |
|
|
buffer += new_text |
|
|
buffer = buffer.replace("<|im_end|>", "") |
|
|
time.sleep(0.01) |
|
|
yield buffer, buffer |
|
|
|
|
|
@spaces.GPU |
|
|
def generate_video(model_name: str, text: str, video_path: str, |
|
|
max_new_tokens: int = 1024, |
|
|
temperature: float = 0.6, |
|
|
top_p: float = 0.9, |
|
|
top_k: int = 50, |
|
|
repetition_penalty: float = 1.2): |
|
|
""" |
|
|
Generates responses using the selected model for video input. |
|
|
Yields raw text and Markdown-formatted text. |
|
|
""" |
|
|
if model_name == "RolmOCR-7B": |
|
|
processor = processor_m |
|
|
model = model_m |
|
|
elif model_name == "Qwen2-VL-OCR-2B": |
|
|
processor = processor_x |
|
|
model = model_x |
|
|
elif model_name == "Nanonets-OCR-s": |
|
|
processor = processor_v |
|
|
model = model_v |
|
|
elif model_name == "Aya-Vision-8B": |
|
|
processor = processor_a |
|
|
model = model_a |
|
|
elif model_name == "olmOCR-7B-0725": |
|
|
processor = processor_w |
|
|
model = model_w |
|
|
else: |
|
|
yield "Invalid model selected.", "Invalid model selected." |
|
|
return |
|
|
|
|
|
if video_path is None: |
|
|
yield "Please upload a video.", "Please upload a video." |
|
|
return |
|
|
|
|
|
frames_with_ts = downsample_video(video_path) |
|
|
images_for_processor = [frame for frame, ts in frames_with_ts] |
|
|
|
|
|
messages = [{"role": "user", "content": [{"type": "text", "text": text}]}] |
|
|
for frame in images_for_processor: |
|
|
messages[0]["content"].insert(0, {"type": "image"}) |
|
|
|
|
|
prompt_full = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) |
|
|
|
|
|
inputs = processor( |
|
|
text=[prompt_full], |
|
|
images=images_for_processor, |
|
|
return_tensors="pt", |
|
|
padding=True, |
|
|
truncation=True, |
|
|
max_length=MAX_INPUT_TOKEN_LENGTH |
|
|
).to(device) |
|
|
|
|
|
streamer = TextIteratorStreamer(processor, skip_prompt=True, skip_special_tokens=True) |
|
|
generation_kwargs = { |
|
|
**inputs, |
|
|
"streamer": streamer, |
|
|
"max_new_tokens": max_new_tokens, |
|
|
"do_sample": True, |
|
|
"temperature": temperature, |
|
|
"top_p": top_p, |
|
|
"top_k": top_k, |
|
|
"repetition_penalty": repetition_penalty, |
|
|
} |
|
|
thread = Thread(target=model.generate, kwargs=generation_kwargs) |
|
|
thread.start() |
|
|
buffer = "" |
|
|
for new_text in streamer: |
|
|
buffer += new_text |
|
|
buffer = buffer.replace("<|im_end|>", "") |
|
|
time.sleep(0.01) |
|
|
yield buffer, buffer |
|
|
|
|
|
|
|
|
image_examples = [ |
|
|
["Extract the full page.", "images/ocr.png"], |
|
|
["Extract the content.", "images/4.png"], |
|
|
["Convert this page to doc [table] precisely for markdown.", "images/0.png"] |
|
|
] |
|
|
|
|
|
video_examples = [ |
|
|
["Explain the Ad in Detail.", "videos/1.mp4"], |
|
|
["Identify the main actions in the cartoon video.", "videos/2.mp4"] |
|
|
] |
|
|
|
|
|
|
|
|
with gr.Blocks(css=css, theme=thistle_theme) as demo: |
|
|
gr.Markdown("# **Multimodal OCR**", elem_id="main-title") |
|
|
with gr.Row(): |
|
|
with gr.Column(scale=2): |
|
|
with gr.Tabs(): |
|
|
with gr.TabItem("Image Inference"): |
|
|
image_query = gr.Textbox(label="Query Input", placeholder="Enter your query here...") |
|
|
image_upload = gr.Image(type="pil", label="Upload Image", height=290) |
|
|
image_submit = gr.Button("Submit", variant="primary") |
|
|
gr.Examples( |
|
|
examples=image_examples, |
|
|
inputs=[image_query, image_upload] |
|
|
) |
|
|
with gr.TabItem("Video Inference"): |
|
|
video_query = gr.Textbox(label="Query Input", placeholder="Enter your query here...") |
|
|
video_upload = gr.Video(label="Upload Video", height=290) |
|
|
video_submit = gr.Button("Submit", variant="primary") |
|
|
gr.Examples( |
|
|
examples=video_examples, |
|
|
inputs=[video_query, video_upload] |
|
|
) |
|
|
with gr.Accordion("Advanced options", open=False): |
|
|
max_new_tokens = gr.Slider(label="Max new tokens", minimum=1, maximum=MAX_MAX_NEW_TOKENS, step=1, value=DEFAULT_MAX_NEW_TOKENS) |
|
|
temperature = gr.Slider(label="Temperature", minimum=0.1, maximum=4.0, step=0.1, value=0.6) |
|
|
top_p = gr.Slider(label="Top-p (nucleus sampling)", minimum=0.05, maximum=1.0, step=0.05, value=0.9) |
|
|
top_k = gr.Slider(label="Top-k", minimum=1, maximum=1000, step=1, value=50) |
|
|
repetition_penalty = gr.Slider(label="Repetition penalty", minimum=1.0, maximum=2.0, step=0.05, value=1.2) |
|
|
|
|
|
with gr.Column(scale=3): |
|
|
gr.Markdown("## Output", elem_id="output-title") |
|
|
output = gr.Textbox(label="Raw Output Stream", interactive=False, lines=11, show_copy_button=True) |
|
|
with gr.Accordion("(Result.md)", open=False): |
|
|
markdown_output = gr.Markdown(label="(Result.Md)", latex_delimiters=[ |
|
|
{"left": "$$", "right": "$$", "display": True}, |
|
|
{"left": "$", "right": "$", "display": False} |
|
|
]) |
|
|
|
|
|
model_choice = gr.Radio( |
|
|
choices=["olmOCR-7B-0725", "Nanonets-OCR-s", "RolmOCR-7B", |
|
|
"Aya-Vision-8B", "Qwen2-VL-OCR-2B"], |
|
|
label="Select Model", |
|
|
value="olmOCR-7B-0725" |
|
|
) |
|
|
|
|
|
image_submit.click( |
|
|
fn=generate_image, |
|
|
inputs=[model_choice, image_query, image_upload, max_new_tokens, temperature, top_p, top_k, repetition_penalty], |
|
|
outputs=[output, markdown_output] |
|
|
) |
|
|
video_submit.click( |
|
|
fn=generate_video, |
|
|
inputs=[model_choice, video_query, video_upload, max_new_tokens, temperature, top_p, top_k, repetition_penalty], |
|
|
outputs=[output, markdown_output] |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.queue(max_size=50).launch(mcp_server=True, ssr_mode=False, show_error=True) |