Spaces:
Running
on
Zero
Running
on
Zero
| from pathlib import Path | |
| from typing import Optional | |
| from PIL import Image | |
| from PIL.ImageOps import exif_transpose | |
| from torch.utils.data import Dataset | |
| from torchvision import transforms | |
| import json | |
| import random | |
| from facenet_pytorch import MTCNN | |
| import torch | |
| from utils.utils import extract_faces_and_landmarks, REFERNCE_FACIAL_POINTS_RELATIVE | |
| def load_image(image_path: str) -> Image: | |
| image = Image.open(image_path) | |
| image = exif_transpose(image) | |
| if not image.mode == "RGB": | |
| image = image.convert("RGB") | |
| return image | |
| class ImageDataset(Dataset): | |
| """ | |
| A dataset to prepare the instance and class images with the prompts for fine-tuning the model. | |
| It pre-processes the images. | |
| """ | |
| def __init__( | |
| self, | |
| instance_data_root, | |
| instance_prompt, | |
| metadata_path: Optional[str] = None, | |
| prompt_in_filename=False, | |
| use_only_vanilla_for_encoder=False, | |
| concept_placeholder='a face', | |
| size=1024, | |
| center_crop=False, | |
| aug_images=False, | |
| use_only_decoder_prompts=False, | |
| crop_head_for_encoder_image=False, | |
| random_target_prob=0.0, | |
| ): | |
| self.mtcnn = MTCNN(device='cuda:0') | |
| self.mtcnn.forward = self.mtcnn.detect | |
| resize_factor = 1.3 | |
| self.resized_reference_points = REFERNCE_FACIAL_POINTS_RELATIVE / resize_factor + (resize_factor - 1) / (2 * resize_factor) | |
| self.size = size | |
| self.center_crop = center_crop | |
| self.concept_placeholder = concept_placeholder | |
| self.prompt_in_filename = prompt_in_filename | |
| self.aug_images = aug_images | |
| self.instance_prompt = instance_prompt | |
| self.custom_instance_prompts = None | |
| self.name_to_label = None | |
| self.crop_head_for_encoder_image = crop_head_for_encoder_image | |
| self.random_target_prob = random_target_prob | |
| self.use_only_decoder_prompts = use_only_decoder_prompts | |
| self.instance_data_root = Path(instance_data_root) | |
| if not self.instance_data_root.exists(): | |
| raise ValueError(f"Instance images root {self.instance_data_root} doesn't exist.") | |
| if metadata_path is not None: | |
| with open(metadata_path, 'r') as f: | |
| self.name_to_label = json.load(f) # dict of filename: label | |
| # Create a reversed mapping | |
| self.label_to_names = {} | |
| for name, label in self.name_to_label.items(): | |
| if use_only_vanilla_for_encoder and 'vanilla' not in name: | |
| continue | |
| if label not in self.label_to_names: | |
| self.label_to_names[label] = [] | |
| self.label_to_names[label].append(name) | |
| self.all_paths = [self.instance_data_root / filename for filename in self.name_to_label.keys()] | |
| # Verify all paths exist | |
| n_all_paths = len(self.all_paths) | |
| self.all_paths = [path for path in self.all_paths if path.exists()] | |
| print(f'Found {len(self.all_paths)} out of {n_all_paths} paths.') | |
| else: | |
| self.all_paths = [path for path in list(Path(instance_data_root).glob('**/*')) if | |
| path.suffix.lower() in [".png", ".jpg", ".jpeg"]] | |
| # Sort by name so that order for validation remains the same across runs | |
| self.all_paths = sorted(self.all_paths, key=lambda x: x.stem) | |
| self.custom_instance_prompts = None | |
| self._length = len(self.all_paths) | |
| self.class_data_root = None | |
| self.image_transforms = transforms.Compose( | |
| [ | |
| transforms.Resize(size, interpolation=transforms.InterpolationMode.BILINEAR), | |
| transforms.CenterCrop(size) if center_crop else transforms.RandomCrop(size), | |
| transforms.ToTensor(), | |
| transforms.Normalize([0.5], [0.5]), | |
| ] | |
| ) | |
| if self.prompt_in_filename: | |
| self.prompts_set = set([self._path_to_prompt(path) for path in self.all_paths]) | |
| else: | |
| self.prompts_set = set([self.instance_prompt]) | |
| if self.aug_images: | |
| self.aug_transforms = transforms.Compose( | |
| [ | |
| transforms.RandomResizedCrop(size, scale=(0.8, 1.0), ratio=(1.0, 1.0)), | |
| transforms.RandomHorizontalFlip(p=0.5) | |
| ] | |
| ) | |
| def __len__(self): | |
| return self._length | |
| def _path_to_prompt(self, path): | |
| # Remove the extension and seed | |
| split_path = path.stem.split('_') | |
| while split_path[-1].isnumeric(): | |
| split_path = split_path[:-1] | |
| prompt = ' '.join(split_path) | |
| # Replace placeholder in prompt with training placeholder | |
| prompt = prompt.replace('conceptname', self.concept_placeholder) | |
| return prompt | |
| def __getitem__(self, index): | |
| example = {} | |
| instance_path = self.all_paths[index] | |
| instance_image = load_image(instance_path) | |
| example["instance_images"] = self.image_transforms(instance_image) | |
| if self.prompt_in_filename: | |
| example["instance_prompt"] = self._path_to_prompt(instance_path) | |
| else: | |
| example["instance_prompt"] = self.instance_prompt | |
| if self.name_to_label is None: | |
| # If no labels, simply take the same image but with different augmentation | |
| example["encoder_images"] = self.aug_transforms(example["instance_images"]) if self.aug_images else example["instance_images"] | |
| example["encoder_prompt"] = example["instance_prompt"] | |
| else: | |
| # Randomly select another image with the same label | |
| instance_name = str(instance_path.relative_to(self.instance_data_root)) | |
| instance_label = self.name_to_label[instance_name] | |
| label_set = set(self.label_to_names[instance_label]) | |
| if len(label_set) == 1: | |
| # We are not supposed to have only one image per label, but just in case | |
| encoder_image_name = instance_name | |
| print(f'WARNING: Only one image for label {instance_label}.') | |
| else: | |
| encoder_image_name = random.choice(list(label_set - {instance_name})) | |
| encoder_image = load_image(self.instance_data_root / encoder_image_name) | |
| example["encoder_images"] = self.image_transforms(encoder_image) | |
| if self.prompt_in_filename: | |
| example["encoder_prompt"] = self._path_to_prompt(self.instance_data_root / encoder_image_name) | |
| else: | |
| example["encoder_prompt"] = self.instance_prompt | |
| if self.crop_head_for_encoder_image: | |
| example["encoder_images"] = extract_faces_and_landmarks(example["encoder_images"][None], self.size, self.mtcnn, self.resized_reference_points)[0][0] | |
| example["encoder_prompt"] = example["encoder_prompt"].format(placeholder="<ph>") | |
| example["instance_prompt"] = example["instance_prompt"].format(placeholder="<s*>") | |
| if random.random() < self.random_target_prob: | |
| random_path = random.choice(self.all_paths) | |
| random_image = load_image(random_path) | |
| example["instance_images"] = self.image_transforms(random_image) | |
| if self.prompt_in_filename: | |
| example["instance_prompt"] = self._path_to_prompt(random_path) | |
| if self.use_only_decoder_prompts: | |
| example["encoder_prompt"] = example["instance_prompt"] | |
| return example | |
| def collate_fn(examples, with_prior_preservation=False): | |
| pixel_values = [example["instance_images"] for example in examples] | |
| encoder_pixel_values = [example["encoder_images"] for example in examples] | |
| prompts = [example["instance_prompt"] for example in examples] | |
| encoder_prompts = [example["encoder_prompt"] for example in examples] | |
| if with_prior_preservation: | |
| raise NotImplementedError("Prior preservation not implemented.") | |
| pixel_values = torch.stack(pixel_values) | |
| pixel_values = pixel_values.to(memory_format=torch.contiguous_format).float() | |
| encoder_pixel_values = torch.stack(encoder_pixel_values) | |
| encoder_pixel_values = encoder_pixel_values.to(memory_format=torch.contiguous_format).float() | |
| batch = {"pixel_values": pixel_values, "encoder_pixel_values": encoder_pixel_values, | |
| "prompts": prompts, "encoder_prompts": encoder_prompts} | |
| return batch | |