lapa / app.py
Vladyslav Humennyy
Fix gradio
a50233e
raw
history blame
18 kB
import os
import subprocess
import tempfile
# subprocess.run('pip install flash-attn==2.8.0 --no-build-isolation', env={'FLASH_ATTENTION_SKIP_CUDA_BUILD': "TRUE"}, shell=True)
import threading
# subprocess.check_call([os.sys.executable, "-m", "pip", "install", "-r", "requirements.txt"])
import spaces
import gradio as gr
import torch
from transformers import AutoModelForCausalLM, AutoProcessor, AutoTokenizer, TextIteratorStreamer
from analytics import AnalyticsLogger
from kernels import get_kernel
from typing import Any, Optional, Dict
from PIL import Image
import base64
import io
#vllm_flash_attn3 = get_kernel("kernels-community/vllm-flash-attn3")
#torch._dynamo.config.disable = True
HF_LE_LLM_READ_TOKEN = os.environ.get('HF_LE_LLM_READ_TOKEN')
from huggingface_hub import login
login(token=HF_LE_LLM_READ_TOKEN)
#MODEL_ID = "le-llm/lapa-v0.1-reasoning-only-32768"
MODEL_ID = "le-llm/lapa-v0.1-instruct"
MODEL_ID = "le-llm/lapa-v0.1-matt-instruction-5e06"
MODEL_ID = "le-llm/lapa-v0.1-reprojected"
logger = AnalyticsLogger()
def _begin_analytics_session():
# Called once per client on app load
_ = logger.start_session(MODEL_ID)
def load_model():
"""Lazy-load model, tokenizer, and optional processor (for zeroGPU)."""
device = "cuda" # if torch.cuda.is_available() else "cpu"
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID)
processor = None
try:
processor = AutoProcessor.from_pretrained(MODEL_ID)
except Exception as err: # pragma: no cover - informative fallback
print(f"Warning: AutoProcessor not available ({err}). Falling back to tokenizer.")
model = AutoModelForCausalLM.from_pretrained(
MODEL_ID,
dtype=torch.bfloat16, # if device == "cuda" else torch.float32,
device_map="auto", # if device == "cuda" else None,
attn_implementation="flash_attention_2",# "kernels-community/vllm-flash-attn3", # #
) # .cuda()
print(f"Selected device:", device)
return model, tokenizer, processor, device
# Load model/tokenizer each request → allows zeroGPU to cold start & then release
model, tokenizer, processor, device = load_model()
def user(user_message, image_data, history: list):
"""Format user message with optional image."""
import base64
import io
from PIL import Image
user_message = user_message or ""
updated_history = list(history)
has_content = False
stripped_message = user_message.strip()
# If we have an image, save it to temp file for Gradio display and also encode as base64 for model
if image_data is not None:
# Save to temp file for Gradio display
fd, tmp_path = tempfile.mkstemp(suffix=".jpg")
os.close(fd)
image_data.save(tmp_path, format="JPEG")
# Also encode as base64 for model processing (stored in metadata)
buffered = io.BytesIO()
image_data.save(buffered, format="JPEG")
img_base64 = base64.b64encode(buffered.getvalue()).decode()
text_content = stripped_message if stripped_message else "Describe this image"
# Store both text and image in a single message with base64 in metadata
updated_history.append({
"role": "user",
"content": text_content
})
updated_history.append({
"role": "user",
"content": {
"path": tmp_path,
"alt_text": "User uploaded image"
},
})
has_content = True
elif stripped_message:
updated_history.append({"role": "user", "content": stripped_message})
has_content = True
if not has_content:
# Nothing to submit yet; keep inputs unchanged
return user_message, image_data, history
return "", None, updated_history
def append_example_message(x: gr.SelectData, history):
print(x)
print(x.value)
print(x.value["text"])
if x.value["text"] is not None:
history.append({"role": "user", "content": x.value["text"]})
return history
def _extract_text_from_content(content: Any) -> str:
"""Extract text from message content for logging."""
if isinstance(content, str):
return content
if isinstance(content, list):
text_parts = []
for item in content:
if isinstance(item, dict) and item.get("type") == "text":
text_parts.append(item.get("text", ""))
return " ".join(text_parts) if text_parts else "[Image]"
return str(content)
def _clean_history_for_display(history: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Remove internal metadata fields like _base64 before displaying in Gradio."""
cleaned = []
for message in history:
cleaned_message = {"role": message.get("role", "user")}
content = message.get("content")
if isinstance(content, str):
cleaned_message["content"] = content
elif isinstance(content, list):
cleaned_content = []
for item in content:
if isinstance(item, dict):
# Remove _base64 metadata
cleaned_item = {k: v for k, v in item.items() if not k.startswith("_")}
cleaned_content.append(cleaned_item)
else:
cleaned_content.append(item)
cleaned_message["content"] = cleaned_content
else:
cleaned_message["content"] = content
cleaned.append(cleaned_message)
return cleaned
def format_message_with_image(
text: str, role: str, image: Optional[Image.Image] = None
) -> Dict[str, Any]:
"""Format message for VLLM API with optional image."""
if image is not None:
# Convert PIL image to base64
buffered = io.BytesIO()
image.save(buffered, format="JPEG")
img_base64 = base64.b64encode(buffered.getvalue()).decode()
return {
"role": role,
"content": [
{"type": "text", "text": text},
{
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{img_base64}"},
},
],
}
else:
return {"role": role, "content": text}
@spaces.GPU
def bot(
history: list[dict[str, Any]]
):
"""Generate bot response with support for text and images."""
max_tokens = 4096
temperature = 0.7
top_p = 0.95
# Early return if no input
if not history:
return
# Extract last user message for logging
last_user_msg = next((msg for msg in reversed(history) if msg.get("role") == "user"), None)
user_message_text = _extract_text_from_content(last_user_msg.get("content")) if last_user_msg else ""
print('User message:', user_message_text)
# Check if any message contains images
has_images = any(
isinstance(msg.get("content"), list) and
any(item.get("type") == "image" for item in msg.get("content") if isinstance(item, dict))
for msg in history
)
model_inputs = None
# Use processor if images are present
if processor is not None and has_images:
try:
processor_history = []
for msg in history:
role = msg.get("role", "user")
content = msg.get("content")
if isinstance(content, str):
processor_history.append({"role": role, "content": content})
elif isinstance(content, list):
formatted_content = []
for item in content:
if isinstance(item, dict):
# Add text
if item.get("type") == "text":
formatted_content.append({"type": "text", "text": item.get("text", "")})
elif item.get("type") == "image":
# Use _base64 metadata if available, otherwise load from path
pil_image = None
if "_base64" in item:
img_url = item["_base64"]
if img_url.startswith("data:image"):
base64_data = img_url.split(",")[1]
img_data = base64.b64decode(base64_data)
pil_image = Image.open(io.BytesIO(img_data))
elif "path" in item:
pil_image = Image.open(item["path"])
if pil_image is not None:
# formatted_content.append({"type": "image", "image": pil_image})
buffered = io.BytesIO()
pil_image.save(buffered, format="JPEG")
img_base64 = base64.b64encode(buffered.getvalue()).decode()
{
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{img_base64}"},
}
if formatted_content:
processor_history.append({"role": role, "content": formatted_content})
model_inputs = processor(
messages=processor_history,
return_tensors="pt",
add_generation_prompt=True,
).to(model.device)
print("Using processor for vision input")
except Exception as exc:
print(f"Processor failed: {exc}")
model_inputs = None
# Fallback to tokenizer for text-only
if model_inputs is None:
# Convert to text-only format for tokenizer
text_history = []
for msg in history:
role = msg.get("role", "user")
content = msg.get("content")
text_content = _extract_text_from_content(content)
if text_content:
text_history.append({"role": role, "content": text_content})
if text_history:
input_text = tokenizer.apply_chat_template(
text_history,
tokenize=False,
add_generation_prompt=True,
)
if input_text and tokenizer.bos_token:
input_text = input_text.replace(tokenizer.bos_token, "", 1)
model_inputs = tokenizer(input_text, return_tensors="pt").to(model.device)
print("Using tokenizer for text-only input")
if model_inputs is None:
return
# Streamer setup
streamer = TextIteratorStreamer(tokenizer, skip_prompt=True)
# Run model.generate in background thread
generation_kwargs = dict(
**model_inputs,
max_new_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
top_k=64,
do_sample=True,
streamer=streamer,
)
thread = threading.Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
history.append({"role": "assistant", "content": ""})
# Yield tokens as they come in
for new_text in streamer:
history[-1]["content"] += new_text
yield _clean_history_for_display(history)
assistant_message = history[-1]["content"]
logger.log_interaction(user=user_message_text, answer=assistant_message)
# --- drop-in UI compatible with older Gradio versions ---
import os, tempfile, time
import gradio as gr
# Ukrainian-inspired theme with deep, muted colors reflecting unbeatable spirit:
THEME = gr.themes.Soft(
primary_hue="blue", # Deep blue representing Ukrainian sky and resolve
secondary_hue="amber", # Warm amber representing golden fields and determination
neutral_hue="stone", # Earthy stone representing strength and foundation
)
# Load CSS from external file
def load_css():
try:
with open("static/style.css", "r", encoding="utf-8") as f:
return f.read()
except FileNotFoundError:
print("Warning: static/style.css not found")
return ""
CSS = load_css()
def _clear_chat():
return "", None, []
with gr.Blocks(theme=THEME, css=CSS, fill_height=True) as demo:
demo.load(fn=_begin_analytics_session, inputs=None, outputs=None)
# Header (no gr.Box to avoid version issues)
gr.HTML(
"""
<div id="app-header">
<div class="app-title">✨ LAPA</div>
<div class="app-subtitle">LLM for Ukrainian Language</div>
</div>
"""
)
with gr.Row(equal_height=True):
# Left side: Chat
with gr.Column(scale=7, elem_id="left-pane"):
with gr.Column(elem_id="chat-card"):
chatbot = gr.Chatbot(
type="messages",
height=560,
render_markdown=True,
show_copy_button=True,
show_label=False,
# likeable=True,
allow_tags=["think"],
elem_id="chatbot",
examples=[
{"text": i}
for i in [
"хто тримає цей район?",
"Напиши історію про Івасика-Телесика",
"Яка найвища гора в Україні?",
"Як звали батька Тараса Григоровича Шевченка?",
"Яка з цих гір не знаходиться у Європі? Говерла, Монблан, Гран-Парадізо, Еверест",
"Дай відповідь на питання\nЧому у качки жовті ноги?",
]
],
)
image_input = gr.Image(
label="Attach image (optional)",
type="pil",
sources=["upload", "clipboard"],
height=200,
interactive=True,
elem_id="image-input",
)
# ChatGPT-style input box with stop button
with gr.Row(elem_id="chat-input-row"):
msg = gr.Textbox(
label=None,
placeholder="Message… (Press Enter to send)",
autofocus=True,
lines=1,
max_lines=6,
container=False,
show_label=False,
elem_id="chat-input",
elem_classes=["chat-input-box"]
)
stop_btn_visible = gr.Button(
"⏹️",
variant="secondary",
elem_id="stop-btn-visible",
elem_classes=["stop-btn-chat"],
visible=False,
size="sm"
)
# Hidden buttons for functionality
with gr.Row(visible=True, elem_id="hidden-buttons"):
send_btn = gr.Button("Send", variant="primary", elem_id="send-btn")
stop_btn = gr.Button("Stop", variant="secondary", elem_id="stop-btn")
clear_btn = gr.Button("Clear", variant="secondary", elem_id="clear-btn")
# export_btn = gr.Button("Export chat (.md)", variant="secondary", elem_classes=["rounded-btn","secondary-btn"])
# exported_file = gr.File(label="", interactive=False, visible=True)
gr.HTML('<div class="footer-tip">Shortcuts: Enter to send • Shift+Enter for new line</div>')
# Helper functions for managing UI state
def show_stop_button():
return gr.update(visible=True)
def hide_stop_button():
return gr.update(visible=False)
# Events (preserve your original handlers)
e1 = msg.submit(fn=user, inputs=[msg, image_input, chatbot], outputs=[msg, image_input, chatbot], queue=True).then(
fn=show_stop_button, inputs=None, outputs=stop_btn_visible
).then(
fn=bot, inputs=chatbot, outputs=chatbot
).then(
fn=hide_stop_button, inputs=None, outputs=stop_btn_visible
)
e2 = send_btn.click(fn=user, inputs=[msg, image_input, chatbot], outputs=[msg, image_input, chatbot], queue=True).then(
fn=show_stop_button, inputs=None, outputs=stop_btn_visible
).then(
fn=bot, inputs=chatbot, outputs=chatbot
).then(
fn=hide_stop_button, inputs=None, outputs=stop_btn_visible
)
e3 = chatbot.example_select(fn=append_example_message, inputs=[chatbot], outputs=[chatbot], queue=True).then(
fn=show_stop_button, inputs=None, outputs=stop_btn_visible
).then(
fn=bot, inputs=chatbot, outputs=chatbot
).then(
fn=hide_stop_button, inputs=None, outputs=stop_btn_visible
)
# Stop cancels running events (both buttons work)
stop_btn.click(fn=hide_stop_button, inputs=None, outputs=stop_btn_visible, cancels=[e1, e2, e3], queue=True)
stop_btn_visible.click(fn=hide_stop_button, inputs=None, outputs=stop_btn_visible, cancels=[e1, e2, e3], queue=True)
# Clear chat + input
clear_btn.click(fn=_clear_chat, inputs=None, outputs=[msg, image_input, chatbot])
# Export markdown
# export_btn.click(fn=_export_markdown, inputs=chatbot, outputs=exported_file)
# Load and inject external JavaScript
def load_javascript():
try:
with open("static/script.js", "r", encoding="utf-8") as f:
return f"<script>{f.read()}</script>"
except FileNotFoundError:
print("Warning: static/script.js not found")
return ""
gr.HTML(load_javascript())
if __name__ == "__main__":
demo.queue().launch()