| from __future__ import annotations | |
| import datetime | |
| import os | |
| import pathlib | |
| import shlex | |
| import shutil | |
| import subprocess | |
| import gradio as gr | |
| import PIL.Image | |
| import slugify | |
| import torch | |
| from huggingface_hub import HfApi | |
| from app_upload import LoRAModelUploader | |
| from utils import save_model_card | |
| URL_TO_JOIN_LORA_LIBRARY_ORG = 'https://huggingface.co/organizations/lora-library/share/hjetHAcKjnPHXhHfbeEcqnBqmhgilFfpOL' | |
| def pad_image(image: PIL.Image.Image) -> PIL.Image.Image: | |
| w, h = image.size | |
| if w == h: | |
| return image | |
| elif w > h: | |
| new_image = PIL.Image.new(image.mode, (w, w), (0, 0, 0)) | |
| new_image.paste(image, (0, (w - h) // 2)) | |
| return new_image | |
| else: | |
| new_image = PIL.Image.new(image.mode, (h, h), (0, 0, 0)) | |
| new_image.paste(image, ((h - w) // 2, 0)) | |
| return new_image | |
| class Trainer: | |
| def __init__(self, hf_token: str | None = None): | |
| self.hf_token = hf_token | |
| self.api = HfApi(token=hf_token) | |
| self.model_uploader = LoRAModelUploader(hf_token) | |
| def prepare_dataset(self, instance_images: list, resolution: int, | |
| instance_data_dir: pathlib.Path) -> None: | |
| shutil.rmtree(instance_data_dir, ignore_errors=True) | |
| instance_data_dir.mkdir(parents=True) | |
| for i, temp_path in enumerate(instance_images): | |
| image = PIL.Image.open(temp_path.name) | |
| image = pad_image(image) | |
| image = image.resize((resolution, resolution)) | |
| image = image.convert('RGB') | |
| out_path = instance_data_dir / f'{i:03d}.jpg' | |
| image.save(out_path, format='JPEG', quality=100) | |
| def join_lora_library_org(self) -> None: | |
| subprocess.run( | |
| shlex.split( | |
| f'curl -X POST -H "Authorization: Bearer {self.hf_token}" -H "Content-Type: application/json" {URL_TO_JOIN_LORA_LIBRARY_ORG}' | |
| )) | |
| def run( | |
| self, | |
| instance_images: list | None, | |
| instance_prompt: str, | |
| output_model_name: str, | |
| overwrite_existing_model: bool, | |
| validation_prompt: str, | |
| base_model: str, | |
| resolution_s: str, | |
| n_steps: int, | |
| learning_rate: float, | |
| gradient_accumulation: int, | |
| seed: int, | |
| fp16: bool, | |
| use_8bit_adam: bool, | |
| checkpointing_steps: int, | |
| use_wandb: bool, | |
| validation_epochs: int, | |
| upload_to_hub: bool, | |
| use_private_repo: bool, | |
| delete_existing_repo: bool, | |
| upload_to: str, | |
| remove_gpu_after_training: bool, | |
| ) -> str: | |
| if not torch.cuda.is_available(): | |
| raise gr.Error('CUDA is not available.') | |
| if instance_images is None: | |
| raise gr.Error('You need to upload images.') | |
| if not instance_prompt: | |
| raise gr.Error('The instance prompt is missing.') | |
| if not validation_prompt: | |
| raise gr.Error('The validation prompt is missing.') | |
| resolution = int(resolution_s) | |
| if not output_model_name: | |
| timestamp = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S') | |
| output_model_name = f'lora-dreambooth-{timestamp}' | |
| output_model_name = slugify.slugify(output_model_name) | |
| repo_dir = pathlib.Path(__file__).parent | |
| output_dir = repo_dir / 'experiments' / output_model_name | |
| if overwrite_existing_model or upload_to_hub: | |
| shutil.rmtree(output_dir, ignore_errors=True) | |
| output_dir.mkdir(parents=True) | |
| instance_data_dir = repo_dir / 'training_data' / output_model_name | |
| self.prepare_dataset(instance_images, resolution, instance_data_dir) | |
| if upload_to_hub: | |
| self.join_lora_library_org() | |
| command = f''' | |
| accelerate launch train_dreambooth_lora.py \ | |
| --pretrained_model_name_or_path={base_model} \ | |
| --instance_data_dir={instance_data_dir} \ | |
| --output_dir={output_dir} \ | |
| --instance_prompt="{instance_prompt}" \ | |
| --resolution={resolution} \ | |
| --train_batch_size=1 \ | |
| --gradient_accumulation_steps={gradient_accumulation} \ | |
| --learning_rate={learning_rate} \ | |
| --lr_scheduler=constant \ | |
| --lr_warmup_steps=0 \ | |
| --max_train_steps={n_steps} \ | |
| --checkpointing_steps={checkpointing_steps} \ | |
| --validation_prompt="{validation_prompt}" \ | |
| --validation_epochs={validation_epochs} \ | |
| --seed={seed} | |
| ''' | |
| if fp16: | |
| command += ' --mixed_precision fp16' | |
| if use_8bit_adam: | |
| command += ' --use_8bit_adam' | |
| if use_wandb: | |
| command += ' --report_to wandb' | |
| with open(output_dir / 'train.sh', 'w') as f: | |
| command_s = ' '.join(command.split()) | |
| f.write(command_s) | |
| subprocess.run(shlex.split(command)) | |
| save_model_card(save_dir=output_dir, | |
| base_model=base_model, | |
| instance_prompt=instance_prompt, | |
| test_prompt=validation_prompt, | |
| test_image_dir='test_images') | |
| message = 'Training completed!' | |
| print(message) | |
| if upload_to_hub: | |
| upload_message = self.model_uploader.upload_lora_model( | |
| folder_path=output_dir.as_posix(), | |
| repo_name=output_model_name, | |
| upload_to=upload_to, | |
| private=use_private_repo, | |
| delete_existing_repo=delete_existing_repo) | |
| print(upload_message) | |
| message = message + '\n' + upload_message | |
| if remove_gpu_after_training: | |
| space_id = os.getenv('SPACE_ID') | |
| if space_id: | |
| self.api.request_space_hardware(repo_id=space_id, | |
| hardware='cpu-basic') | |
| return message | |