Spaces:
Runtime error
Runtime error
| import os | |
| import sys | |
| import uuid | |
| import spaces | |
| import torch | |
| import random | |
| import numpy as np | |
| from PIL import Image | |
| import open3d as o3d | |
| import matplotlib.pyplot as plt | |
| from transformers import AutoProcessor, AutoModelForCausalLM | |
| from transformers import SamModel, SamProcessor | |
| import depth_pro | |
| import spacy | |
| import gradio as gr | |
| try: | |
| nlp = spacy.load("en_core_web_sm") | |
| except OSError: | |
| # Download the model if it's not already available | |
| from spacy.cli import download | |
| download("en_core_web_sm") | |
| nlp = spacy.load("en_core_web_sm") | |
| def find_subject(doc): | |
| for token in doc: | |
| # Check if the token is a subject | |
| if "subj" in token.dep_: | |
| return token.text, token.head | |
| return None, None | |
| def extract_descriptions(doc, head): | |
| descriptions = [] | |
| for chunk in doc.noun_chunks: | |
| # Check if the chunk is directly related to the subject's verb or is an attribute | |
| if chunk.root.head == head or chunk.root.dep_ == 'attr': | |
| descriptions.append(chunk.text) | |
| return descriptions | |
| def caption_refiner(caption): | |
| doc = nlp(caption) | |
| subject, action_verb = find_subject(doc) | |
| if action_verb: | |
| descriptions = extract_descriptions(doc, action_verb) | |
| return ', '.join(descriptions) | |
| else: | |
| return caption | |
| def sam2(image, input_boxes, model_id="facebook/sam-vit-base"): | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| model = SamModel.from_pretrained(model_id).to(device) | |
| processor = SamProcessor.from_pretrained(model_id) | |
| inputs = processor(image, input_boxes=[[input_boxes]], return_tensors="pt").to(device) | |
| with torch.no_grad(): | |
| outputs = model(**inputs) | |
| masks = processor.image_processor.post_process_masks( | |
| outputs.pred_masks.cpu(), inputs["original_sizes"].cpu(), inputs["reshaped_input_sizes"].cpu() | |
| ) | |
| return masks | |
| def load_florence2(model_id="microsoft/Florence-2-base-ft", device='cuda'): | |
| torch_dtype = torch.float16 if device == 'cuda' else torch.float32 | |
| florence_model = AutoModelForCausalLM.from_pretrained(model_id, torch_dtype=torch_dtype, trust_remote_code=True).to(device) | |
| florence_processor = AutoProcessor.from_pretrained(model_id, trust_remote_code=True) | |
| return florence_model, florence_processor | |
| def florence2(image, prompt="", task="<OD>"): | |
| device = florence_model.device | |
| torch_dtype = florence_model.dtype | |
| inputs = florence_processor(text=task + prompt, images=image, return_tensors="pt").to(device, torch_dtype) | |
| generated_ids = florence_model.generate( | |
| input_ids=inputs["input_ids"], | |
| pixel_values=inputs["pixel_values"], | |
| max_new_tokens=1024, | |
| num_beams=3, | |
| do_sample=False | |
| ) | |
| generated_text = florence_processor.batch_decode(generated_ids, skip_special_tokens=False)[0] | |
| parsed_answer = florence_processor.post_process_generation(generated_text, task=task, image_size=(image.width, image.height)) | |
| return parsed_answer[task] | |
| def depth_estimation(image_path): | |
| model.eval() | |
| image, _, f_px = depth_pro.load_rgb(image_path) | |
| image = transform(image) | |
| # Run inference. | |
| prediction = model.infer(image, f_px=f_px) | |
| depth = prediction["depth"] # Depth in [m]. | |
| focallength_px = prediction["focallength_px"] # Focal length in pixels. | |
| depth = depth.cpu().numpy() | |
| return depth, focallength_px | |
| def create_point_cloud_from_rgbd(rgb, depth, intrinsic_parameters): | |
| rgbd_image = o3d.geometry.RGBDImage.create_from_color_and_depth( | |
| o3d.geometry.Image(rgb), | |
| o3d.geometry.Image(depth), | |
| depth_scale=10.0, | |
| depth_trunc=100.0, | |
| convert_rgb_to_intensity=False | |
| ) | |
| intrinsic = o3d.camera.PinholeCameraIntrinsic() | |
| intrinsic.set_intrinsics(intrinsic_parameters['width'], intrinsic_parameters['height'], | |
| intrinsic_parameters['fx'], intrinsic_parameters['fy'], | |
| intrinsic_parameters['cx'], intrinsic_parameters['cy']) | |
| pcd = o3d.geometry.PointCloud.create_from_rgbd_image(rgbd_image, intrinsic) | |
| return pcd | |
| def canonicalize_point_cloud(pcd, canonicalize_threshold=0.3): | |
| # Segment the largest plane, assumed to be the floor | |
| plane_model, inliers = pcd.segment_plane(distance_threshold=0.01, ransac_n=3, num_iterations=1000) | |
| canonicalized = False | |
| if len(inliers) / len(pcd.points) > canonicalize_threshold: | |
| canonicalized = True | |
| # Ensure the plane normal points upwards | |
| if np.dot(plane_model[:3], [0, 1, 0]) < 0: | |
| plane_model = -plane_model | |
| # Normalize the plane normal vector | |
| normal = plane_model[:3] / np.linalg.norm(plane_model[:3]) | |
| # Compute the new basis vectors | |
| new_y = normal | |
| new_x = np.cross(new_y, [0, 0, -1]) | |
| new_x /= np.linalg.norm(new_x) | |
| new_z = np.cross(new_x, new_y) | |
| # Create the transformation matrix | |
| transformation = np.identity(4) | |
| transformation[:3, :3] = np.vstack((new_x, new_y, new_z)).T | |
| transformation[:3, 3] = -np.dot(transformation[:3, :3], pcd.points[inliers[0]]) | |
| # Apply the transformation | |
| pcd.transform(transformation) | |
| # Additional 180-degree rotation around the Z-axis | |
| rotation_z_180 = np.array([[np.cos(np.pi), -np.sin(np.pi), 0], | |
| [np.sin(np.pi), np.cos(np.pi), 0], | |
| [0, 0, 1]]) | |
| pcd.rotate(rotation_z_180, center=(0, 0, 0)) | |
| return pcd, canonicalized, transformation | |
| else: | |
| return pcd, canonicalized, None | |
| def compute_iou(box1, box2): | |
| # Extract the coordinates | |
| x1_min, y1_min, x1_max, y1_max = box1 | |
| x2_min, y2_min, x2_max, y2_max = box2 | |
| # Compute the intersection rectangle | |
| x_inter_min = max(x1_min, x2_min) | |
| y_inter_min = max(y1_min, y2_min) | |
| x_inter_max = min(x1_max, x2_max) | |
| y_inter_max = min(y1_max, y2_max) | |
| # Intersection width and height | |
| inter_width = max(0, x_inter_max - x_inter_min) | |
| inter_height = max(0, y_inter_max - y_inter_min) | |
| # Intersection area | |
| inter_area = inter_width * inter_height | |
| # Boxes areas | |
| box1_area = (x1_max - x1_min) * (y1_max - y1_min) | |
| box2_area = (x2_max - x2_min) * (y2_max - y2_min) | |
| # Union area | |
| union_area = box1_area + box2_area - inter_area | |
| # Intersection over Union | |
| iou = inter_area / union_area if union_area != 0 else 0 | |
| return iou | |
| def human_like_distance(distance_meters, scale_factor=10): | |
| # Define the choices with units included, focusing on the 0.1 to 10 meters range | |
| distance_meters *= scale_factor | |
| if distance_meters < 1: # For distances less than 1 meter | |
| choices = [ | |
| ( | |
| round(distance_meters * 100, 2), | |
| "centimeters", | |
| 0.2, | |
| ), # Centimeters for very small distances | |
| ( | |
| round(distance_meters, 2), | |
| "inches", | |
| 0.8, | |
| ), # Inches for the majority of cases under 1 meter | |
| ] | |
| elif distance_meters < 3: # For distances less than 3 meters | |
| choices = [ | |
| (round(distance_meters, 2), "meters", 0.5), | |
| ( | |
| round(distance_meters, 2), | |
| "feet", | |
| 0.5, | |
| ), # Feet as a common unit within indoor spaces | |
| ] | |
| else: # For distances from 3 up to 10 meters | |
| choices = [ | |
| ( | |
| round(distance_meters, 2), | |
| "meters", | |
| 0.7, | |
| ), # Meters for clarity and international understanding | |
| ( | |
| round(distance_meters, 2), | |
| "feet", | |
| 0.3, | |
| ), # Feet for additional context | |
| ] | |
| # Normalize probabilities and make a selection | |
| total_probability = sum(prob for _, _, prob in choices) | |
| cumulative_distribution = [] | |
| cumulative_sum = 0 | |
| for value, unit, probability in choices: | |
| cumulative_sum += probability / total_probability # Normalize probabilities | |
| cumulative_distribution.append((cumulative_sum, value, unit)) | |
| # Randomly choose based on the cumulative distribution | |
| r = random.random() | |
| for cumulative_prob, value, unit in cumulative_distribution: | |
| if r < cumulative_prob: | |
| return f"{value} {unit}" | |
| # Fallback to the last choice if something goes wrong | |
| return f"{choices[-1][0]} {choices[-1][1]}" | |
| def filter_bboxes(data, iou_threshold=0.5): | |
| filtered_bboxes = [] | |
| filtered_labels = [] | |
| for i in range(len(data['bboxes'])): | |
| current_box = data['bboxes'][i] | |
| current_label = data['labels'][i] | |
| is_duplicate = False | |
| for j in range(len(filtered_bboxes)): | |
| if current_label == filtered_labels[j]:# and compute_iou(current_box, filtered_bboxes[j]) > iou_threshold: | |
| is_duplicate = True | |
| break | |
| if not is_duplicate: | |
| filtered_bboxes.append(current_box) | |
| filtered_labels.append(current_label) | |
| return {'bboxes': filtered_bboxes, 'labels': filtered_labels, 'caption': data['caption']} | |
| def process_image(image_path: str): | |
| depth, fx = depth_estimation(image_path) | |
| img = Image.open(image_path).convert('RGB') | |
| width, height = img.size | |
| description = florence2(img, task="<MORE_DETAILED_CAPTION>") | |
| print(description) | |
| regions = [] | |
| for cap in description.split('.'): | |
| if cap: | |
| roi = florence2(img, prompt=" " + cap, task="<CAPTION_TO_PHRASE_GROUNDING>") | |
| roi["caption"] = caption_refiner(cap.lower()) | |
| roi = filter_bboxes(roi) | |
| if len(roi['bboxes']) > 1: | |
| flip = random.choice(['heads', 'tails']) | |
| if flip == 'heads': | |
| idx = random.randint(1, len(roi['bboxes']) - 1) | |
| else: | |
| idx = 0 | |
| if idx > 0: # test bbox IOU | |
| roi['caption'] = roi['labels'][idx].lower() + ' with ' + roi['labels'][0].lower() | |
| roi['bboxes'] = [roi['bboxes'][idx]] | |
| roi['labels'] = [roi['labels'][idx]] | |
| if roi['bboxes']: | |
| regions.append(roi) | |
| print(roi) | |
| bboxes = [item['bboxes'][0] for item in regions] | |
| n = len(bboxes) | |
| distance_matrix = np.zeros((n, n)) | |
| for i in range(n): | |
| for j in range(n): | |
| if i != j: | |
| distance_matrix[i][j] = 1 - compute_iou(bboxes[i], bboxes[j]) | |
| scores = np.sum(distance_matrix, axis=1) | |
| selected_indices = np.argsort(scores)[-3:] | |
| regions = [(regions[i]['bboxes'][0], regions[i]['caption']) for i in selected_indices][:2] | |
| # Create point cloud | |
| camera_intrinsics = intrinsic_parameters = { | |
| 'width': width, | |
| 'height': height, | |
| 'fx': fx, | |
| 'fy': fx * height / width, | |
| 'cx': width / 2, | |
| 'cy': height / 2, | |
| } | |
| pcd = create_point_cloud_from_rgbd(np.array(img).copy(), depth, camera_intrinsics) | |
| normed_pcd, canonicalized, transformation = canonicalize_point_cloud(pcd) | |
| masks = [] | |
| for box, cap in regions: | |
| masks.append((cap, sam2(img, box))) | |
| point_clouds = [] | |
| for cap, mask in masks: | |
| m = mask[0].numpy()[0].squeeze().transpose((1, 2, 0)) | |
| mask = np.any(m, axis=2) | |
| try: | |
| points = np.asarray(normed_pcd.points) | |
| colors = np.asarray(normed_pcd.colors) | |
| masked_points = points[mask.ravel()] | |
| masked_colors = colors[mask.ravel()] | |
| masked_point_cloud = o3d.geometry.PointCloud() | |
| masked_point_cloud.points = o3d.utility.Vector3dVector(masked_points) | |
| masked_point_cloud.colors = o3d.utility.Vector3dVector(masked_colors) | |
| point_clouds.append((cap, masked_point_cloud)) | |
| except: | |
| pass | |
| boxes3D = [] | |
| centers = [] | |
| pcd = o3d.geometry.PointCloud() | |
| for cap, pc in point_clouds[:2]: | |
| cl, ind = pc.remove_statistical_outlier(nb_neighbors=20, std_ratio=2.0) | |
| inlier_cloud = pc.select_by_index(ind) | |
| pcd += inlier_cloud | |
| obb = inlier_cloud.get_axis_aligned_bounding_box() | |
| obb.color = (1, 0, 0) | |
| centers.append(obb.get_center()) | |
| boxes3D.append(obb) | |
| lines = [[0, 1]] | |
| points = [centers[0], centers[1]] | |
| distance = human_like_distance(np.asarray(point_clouds[0][1].compute_point_cloud_distance(point_clouds[-1][1])).mean()) | |
| text_output = "Distance between {} and {} is: {}".format(point_clouds[0][0], point_clouds[-1][0], distance) | |
| print(text_output) | |
| colors = [[1, 0, 0] for i in range(len(lines))] # Red color for lines | |
| line_set = o3d.geometry.LineSet( | |
| points=o3d.utility.Vector3dVector(points), | |
| lines=o3d.utility.Vector2iVector(lines) | |
| ) | |
| line_set.colors = o3d.utility.Vector3dVector(colors) | |
| boxes3D.append(line_set) | |
| uuid_out = str(uuid.uuid4()) | |
| ply_file = f"output_{uuid_out}.ply" | |
| obj_file = f"output_{uuid_out}.obj" | |
| o3d.io.write_point_cloud(ply_file, pcd) | |
| mesh = o3d.io.read_triangle_mesh(ply_file) | |
| o3d.io.write_triangle_mesh(obj_file, mesh) | |
| return obj_file, text_output | |
| def custom_draw_geometry_with_rotation(pcd): | |
| def rotate_view(vis): | |
| ctr = vis.get_view_control() | |
| vis.get_render_option().background_color = [0, 0, 0] | |
| ctr.rotate(1.0, 0.0) | |
| # https://github.com/isl-org/Open3D/issues/1483 | |
| #parameters = o3d.io.read_pinhole_camera_parameters("ScreenCamera_2024-10-24-10-03-57.json") | |
| #ctr.convert_from_pinhole_camera_parameters(parameters) | |
| return False | |
| o3d.visualization.draw_geometries_with_animation_callback([pcd] + boxes3D, | |
| rotate_view) | |
| def build_demo(): | |
| with gr.Blocks() as demo: | |
| # Title and introductory Markdown | |
| gr.Markdown(""" | |
| # Synthesizing SpatialVQA Samples with VQASynth | |
| This space helps test the full [VQASynth](https://github.com/remyxai/VQASynth) scene reconstruction pipeline on a single image with visualizations. | |
| ### [Github](https://github.com/remyxai/VQASynth) | [Collection](https://huggingface.co/collections/remyxai/spacevlms-66a3dbb924756d98e7aec678) | |
| """) | |
| # Description for users | |
| gr.Markdown(""" | |
| ## Instructions | |
| Upload an image, and the tool will generate a corresponding 3D point cloud visualization of the objects found and an example prompt and response describing a spatial relationship between the objects. | |
| """) | |
| with gr.Row(): | |
| # Left Column: Inputs | |
| with gr.Column(): | |
| # Image upload and processing button in the left column | |
| image_input = gr.Image(type="filepath", label="Upload an Image") | |
| generate_button = gr.Button("Generate") | |
| # Right Column: Outputs | |
| with gr.Column(): | |
| # 3D Model and Caption Outputs | |
| model_output = gr.Model3D(label="3D Point Cloud") # Only used as output | |
| caption_output = gr.Text(label="Caption") | |
| # Link the button to process the image and display the outputs | |
| generate_button.click( | |
| process_image, # Your processing function | |
| inputs=image_input, | |
| outputs=[model_output, caption_output] | |
| ) | |
| # Examples section at the bottom | |
| gr.Examples( | |
| examples=[ | |
| ["./examples/warehouse_rgb.jpg"], ["./examples/spooky_doggy.png"], ["./examples/bee_and_flower.jpg"], ["./examples/road-through-dense-forest.jpg"], ["./examples/gears.png"] # Update with the path to your example image | |
| ], | |
| inputs=image_input, | |
| label="Example Images", | |
| examples_per_page=5 | |
| ) | |
| # Citations | |
| gr.Markdown(""" | |
| ## Citation | |
| ``` | |
| @article{chen2024spatialvlm, | |
| title = {SpatialVLM: Endowing Vision-Language Models with Spatial Reasoning Capabilities}, | |
| author = {Chen, Boyuan and Xu, Zhuo and Kirmani, Sean and Ichter, Brian and Driess, Danny and Florence, Pete and Sadigh, Dorsa and Guibas, Leonidas and Xia, Fei}, | |
| journal = {arXiv preprint arXiv:2401.12168}, | |
| year = {2024}, | |
| url = {https://arxiv.org/abs/2401.12168}, | |
| } | |
| ``` | |
| """) | |
| return demo | |
| if __name__ == "__main__": | |
| global model, transform, florence_model, florence_processor | |
| model, transform = depth_pro.create_model_and_transforms(device='cuda') | |
| florence_model, florence_processor = load_florence2(device='cuda') | |
| demo = build_demo() | |
| demo.launch(share=True) | |