Spaces:
Paused
Paused
| import base64 | |
| import re | |
| import textwrap | |
| from io import BytesIO | |
| from PIL import Image, ImageDraw, ImageFont | |
| from proxy_lite.environments.environment_base import Action, Observation | |
| from proxy_lite.recorder import Run | |
| def create_run_gif( | |
| run: Run, output_path: str, white_panel_width: int = 300, duration: int = 1500, resize_factor: int = 4 | |
| ) -> None: | |
| """ | |
| Generate a gif from the Run object's history. | |
| For each Observation record, the observation image is decoded from its base64 | |
| encoded string. If the next record is an Action, its text is drawn onto a | |
| white panel. The observation image and the white panel are then concatenated | |
| horizontally to produce a frame. | |
| Parameters: | |
| run (Run): A Run object with its history containing Observation and Action records. | |
| output_path (str): The path where the GIF will be saved. | |
| white_panel_width (int): The width of the white panel for displaying text. | |
| Default increased to 400 for larger images. | |
| duration (int): Duration between frames in milliseconds. | |
| Increased here to slow the FPS (default is 1000ms). | |
| resize_factor (int): The factor to resize the image down by. | |
| """ | |
| frames = [] | |
| history = run.history | |
| i = 0 | |
| while i < len(history): | |
| if isinstance(history[i], Observation): | |
| observation = history[i] | |
| image_data = observation.state.image | |
| if not image_data: | |
| i += 1 | |
| continue | |
| # Decode the base64 image | |
| image_bytes = base64.b64decode(image_data) | |
| obs_img = Image.open(BytesIO(image_bytes)).convert("RGB") | |
| # scale the image down | |
| obs_img = obs_img.resize((obs_img.width // resize_factor, obs_img.height // resize_factor)) | |
| # Check if the next record is an Action and extract its text if available | |
| action_text = "" | |
| if i + 1 < len(history) and isinstance(history[i + 1], Action): | |
| action = history[i + 1] | |
| if action.text: | |
| action_text = action.text | |
| # extract observation and thinking from tags in the action text | |
| observation_match = re.search(r"<observation>(.*?)</observation>", action_text, re.DOTALL) | |
| observation_content = observation_match.group(1).strip() if observation_match else None | |
| # Extract text between thinking tags if present | |
| thinking_match = re.search(r"<thinking>(.*?)</thinking>", action_text, re.DOTALL) | |
| thinking_content = thinking_match.group(1).strip() if thinking_match else None | |
| if observation_content and thinking_content: | |
| action_text = f"**OBSERVATION**\n{observation_content}\n\n**THINKING**\n{thinking_content}" | |
| # Create a white panel (same height as the observation image) | |
| panel = Image.new("RGB", (white_panel_width, obs_img.height), "white") | |
| draw = ImageDraw.Draw(panel) | |
| font = ImageFont.load_default() | |
| # Wrap the action text if it is too long | |
| max_chars_per_line = 40 # Adjusted for larger font size | |
| wrapped_text = textwrap.fill(action_text, width=max_chars_per_line) | |
| # Calculate text block size and center it on the panel | |
| try: | |
| # Use multiline_textbbox if available (returns bounding box tuple) | |
| bbox = draw.multiline_textbbox((0, 0), wrapped_text, font=font) | |
| text_width, text_height = bbox[2] - bbox[0], bbox[3] - bbox[1] | |
| except AttributeError: | |
| # Fallback for older Pillow versions: compute size for each line | |
| lines = wrapped_text.splitlines() or [wrapped_text] | |
| line_sizes = [draw.textsize(line, font=font) for line in lines] | |
| text_width = max(width for width, _ in line_sizes) | |
| text_height = sum(height for _, height in line_sizes) | |
| text_x = (white_panel_width - text_width) // 2 | |
| text_y = (obs_img.height - text_height) // 2 | |
| draw.multiline_text((text_x, text_y), wrapped_text, fill="black", font=font, align="center") | |
| # Create the combined frame by concatenating the observation image and the panel | |
| total_width = obs_img.width + white_panel_width | |
| combined_frame = Image.new("RGB", (total_width, obs_img.height)) | |
| combined_frame.paste(obs_img, (0, 0)) | |
| combined_frame.paste(panel, (obs_img.width, 0)) | |
| frames.append(combined_frame) | |
| # Skip the Action record since it has been processed with this Observation | |
| if i + 1 < len(history) and isinstance(history[i + 1], Action): | |
| i += 2 | |
| else: | |
| i += 1 | |
| else: | |
| i += 1 | |
| if frames: | |
| frames[0].save(output_path, save_all=True, append_images=frames[1:], duration=duration, loop=0) | |
| else: | |
| raise ValueError("No frames were generated from the Run object's history.") | |
| # Example usage: | |
| if __name__ == "__main__": | |
| from proxy_lite.recorder import Run | |
| dummy_run = Run.load("0abdb4cb-f289-48b0-ba13-35ed1210f7c1") | |
| num_steps = int(len(dummy_run.history) / 2) | |
| print(f"Number of steps: {num_steps}") | |
| output_gif_path = "trajectory.gif" | |
| create_run_gif(dummy_run, output_gif_path, duration=1000) | |
| print(f"Trajectory GIF saved to {output_gif_path}") | |