Spaces:
Runtime error
Runtime error
| import os | |
| import random | |
| import json | |
| from PIL import Image | |
| import cv2 | |
| import numpy as np | |
| import torch | |
| import torch.nn.functional as F | |
| from pycocotools import mask | |
| from transformers import CLIPImageProcessor | |
| from VisualSearch.model.llava import conversation as conversation_lib | |
| from transformers import OwlViTProcessor | |
| from VisualSearch.utils.utils import box_xyxy_to_cxcywh, expand2square | |
| from VisualSearch.utils.utils import ANSWER_LIST, SHORT_QUESTION_LIST | |
| class MixedGroundingDataset(torch.utils.data.Dataset): | |
| pixel_mean = torch.Tensor([123.675, 116.28, 103.53]).view(-1, 1, 1) | |
| pixel_std = torch.Tensor([58.395, 57.12, 57.375]).view(-1, 1, 1) | |
| img_size = 1024 | |
| ignore_label = 255 | |
| def __init__( | |
| self, | |
| base_dir, | |
| tokenizer, | |
| vision_tower, | |
| samples_per_epoch=500 * 8 * 2 * 10, | |
| precision: str = "fp32", | |
| num_classes_per_sample: int = 3, | |
| exclude_val=False, | |
| ): | |
| self.samples_per_epoch = samples_per_epoch | |
| self.num_classes_per_sample = num_classes_per_sample | |
| self.base_dir = base_dir | |
| self.tokenizer = tokenizer | |
| self.precision = precision | |
| self.transform = OwlViTProcessor.from_pretrained("google/owlvit-base-patch16") | |
| self.clip_image_processor = CLIPImageProcessor.from_pretrained(vision_tower) | |
| self.short_question_list = SHORT_QUESTION_LIST | |
| self.answer_list = ANSWER_LIST | |
| with open(os.path.join(base_dir, 'MixedGrounding', 'goldG_train.json')) as f: | |
| self.images = json.load(f) | |
| def __len__(self): | |
| return self.samples_per_epoch | |
| def preprocess(self, x: torch.Tensor) -> torch.Tensor: | |
| """Normalize pixel values and pad to a square input.""" | |
| # Normalize colors | |
| x = (x - self.pixel_mean) / self.pixel_std | |
| # Pad | |
| h, w = x.shape[-2:] | |
| padh = self.img_size - h | |
| padw = self.img_size - w | |
| x = F.pad(x, (0, padw, 0, padh)) | |
| return x | |
| def __getitem__(self, idx): | |
| idx = random.randint(0, len(self.images) - 1) | |
| image_info = self.images[idx] | |
| image_data_source = image_info['data_source'] | |
| file_name = image_info["file_name"] | |
| assert image_data_source in ['coco', 'vg', 'flickr'] | |
| if image_data_source == 'coco': | |
| image_path = os.path.join(self.base_dir, 'coco2014/train2014', file_name) | |
| elif image_data_source == 'vg': | |
| image_path = os.path.join(self.base_dir, 'MixedGrounding/GQA/images', file_name) | |
| else: | |
| image_path = os.path.join(self.base_dir, 'MixedGrounding/flickr30k-images', file_name) | |
| caption = image_info['caption'] | |
| instances = image_info['instances'] | |
| if len(instances) == 0: | |
| return self.__getitem__(0) | |
| if len(instances) >= self.num_classes_per_sample: | |
| sampled_inds = np.random.choice( | |
| list(range(len(instances))), size=self.num_classes_per_sample, replace=False | |
| ) | |
| else: | |
| sampled_inds = list(range(len(instances))) | |
| sampled_classes = sampled_inds | |
| image = cv2.imread(image_path) | |
| image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
| # preprocess image for clip | |
| image_clip = self.clip_image_processor.preprocess( | |
| expand2square(Image.open(image_path).convert('RGB'), tuple(int(x*255) for x in self.clip_image_processor.image_mean)), return_tensors="pt")["pixel_values"][0] | |
| original_size = image.shape[:2] | |
| image = self.transform(images=image, return_tensors="pt")['pixel_values'][0] | |
| resize = image.shape[:2] | |
| questions = [] | |
| answers = [] | |
| bboxes_labels = [] | |
| for sample_ind in sampled_inds: | |
| text = [] | |
| tokens_positive = instances[sample_ind]['tokens_positive'] | |
| for token in tokens_positive: | |
| text.append(caption[token[0]:token[1]]) | |
| text = " ".join(text) | |
| text = text.strip() | |
| question_template = random.choice(self.short_question_list) | |
| questions.append(question_template.format(class_name=text.lower())) | |
| answers.append(random.choice(self.answer_list)) | |
| cur_bboxes = [instances[sample_ind]['bbox']] | |
| cur_bboxes = torch.tensor(cur_bboxes).view(-1, 4) | |
| # xywh to x1y1x2y2 | |
| cur_bboxes[:, 2:] += cur_bboxes[:, :2] | |
| cur_bboxes[:, 0::2].clamp_(min=0, max=original_size[1]) | |
| cur_bboxes[:, 1::2].clamp_(min=0, max=original_size[0]) | |
| keep = (cur_bboxes[:, 3] > cur_bboxes[:, 1]) & (cur_bboxes[:, 2] > cur_bboxes[:, 0]) | |
| cur_bboxes = cur_bboxes[keep] | |
| cur_bboxes = box_xyxy_to_cxcywh(cur_bboxes) | |
| cur_bboxes = cur_bboxes / torch.tensor([original_size[1], original_size[0], original_size[1], original_size[0]], dtype=torch.float32) | |
| if len(cur_bboxes) == 0: | |
| return self.__getitem__(0) | |
| bboxes_labels.append(cur_bboxes) | |
| conversations = [] | |
| conv = conversation_lib.default_conversation.copy() | |
| i = 0 | |
| while i < len(questions): | |
| conv.messages = [] | |
| conv.append_message(conv.roles[0], questions[i]) | |
| conv.append_message(conv.roles[1], answers[i]) | |
| conversations.append(conv.get_prompt()) | |
| i += 1 | |
| bboxes_valid = [1]*len(bboxes_labels) | |
| masks_valid = [0]*len(bboxes_labels) | |
| masks = torch.rand(len(bboxes_labels), *original_size) | |
| label = torch.ones(original_size) * self.ignore_label | |
| return ( | |
| image_path, | |
| image, | |
| image_clip, | |
| conversations, | |
| masks, | |
| label, | |
| bboxes_labels, | |
| bboxes_valid, | |
| masks_valid, | |
| resize, | |
| questions, | |
| sampled_classes, | |
| ) | |