Spaces:
Running
Running
| import gc | |
| from math import ceil | |
| from typing import Any, List | |
| import random | |
| import numpy as np | |
| import torch | |
| from backend.device import is_openvino_device | |
| from backend.controlnet import ( | |
| load_controlnet_adapters, | |
| update_controlnet_arguments, | |
| ) | |
| from backend.models.lcmdiffusion_setting import ( | |
| DiffusionTask, | |
| LCMDiffusionSetting, | |
| LCMLora, | |
| ) | |
| from backend.openvino.pipelines import ( | |
| get_ov_image_to_image_pipeline, | |
| get_ov_text_to_image_pipeline, | |
| ov_load_taesd, | |
| ) | |
| from backend.pipelines.lcm import ( | |
| get_image_to_image_pipeline, | |
| get_lcm_model_pipeline, | |
| load_taesd, | |
| ) | |
| from backend.pipelines.lcm_lora import get_lcm_lora_pipeline | |
| from constants import DEVICE, GGUF_THREADS | |
| from diffusers import LCMScheduler | |
| from image_ops import resize_pil_image | |
| from backend.openvino.flux_pipeline import get_flux_pipeline | |
| from backend.openvino.ov_hc_stablediffusion_pipeline import OvHcLatentConsistency | |
| from backend.gguf.gguf_diffusion import ( | |
| GGUFDiffusion, | |
| ModelConfig, | |
| Txt2ImgConfig, | |
| SampleMethod, | |
| ) | |
| from paths import get_app_path | |
| from pprint import pprint | |
| try: | |
| # support for token merging; keeping it optional for now | |
| import tomesd | |
| except ImportError: | |
| print("tomesd library unavailable; disabling token merging support") | |
| tomesd = None | |
| class LCMTextToImage: | |
| def __init__( | |
| self, | |
| device: str = "cpu", | |
| ) -> None: | |
| self.pipeline = None | |
| self.use_openvino = False | |
| self.device = "" | |
| self.previous_model_id = None | |
| self.previous_use_tae_sd = False | |
| self.previous_use_lcm_lora = False | |
| self.previous_ov_model_id = "" | |
| self.previous_token_merging = 0.0 | |
| self.previous_safety_checker = False | |
| self.previous_use_openvino = False | |
| self.img_to_img_pipeline = None | |
| self.is_openvino_init = False | |
| self.previous_lora = None | |
| self.task_type = DiffusionTask.text_to_image | |
| self.previous_use_gguf_model = False | |
| self.previous_gguf_model = None | |
| self.torch_data_type = ( | |
| torch.float32 if is_openvino_device() or DEVICE == "mps" else torch.float16 | |
| ) | |
| self.ov_model_id = None | |
| print(f"Torch datatype : {self.torch_data_type}") | |
| def _pipeline_to_device(self): | |
| print(f"Pipeline device : {DEVICE}") | |
| print(f"Pipeline dtype : {self.torch_data_type}") | |
| self.pipeline.to( | |
| torch_device=DEVICE, | |
| torch_dtype=self.torch_data_type, | |
| ) | |
| def _add_freeu(self): | |
| pipeline_class = self.pipeline.__class__.__name__ | |
| if isinstance(self.pipeline.scheduler, LCMScheduler): | |
| if pipeline_class == "StableDiffusionPipeline": | |
| print("Add FreeU - SD") | |
| self.pipeline.enable_freeu( | |
| s1=0.9, | |
| s2=0.2, | |
| b1=1.2, | |
| b2=1.4, | |
| ) | |
| elif pipeline_class == "StableDiffusionXLPipeline": | |
| print("Add FreeU - SDXL") | |
| self.pipeline.enable_freeu( | |
| s1=0.6, | |
| s2=0.4, | |
| b1=1.1, | |
| b2=1.2, | |
| ) | |
| def _enable_vae_tiling(self): | |
| self.pipeline.vae.enable_tiling() | |
| def _update_lcm_scheduler_params(self): | |
| if isinstance(self.pipeline.scheduler, LCMScheduler): | |
| self.pipeline.scheduler = LCMScheduler.from_config( | |
| self.pipeline.scheduler.config, | |
| beta_start=0.001, | |
| beta_end=0.01, | |
| ) | |
| def _is_hetero_pipeline(self) -> bool: | |
| return "square" in self.ov_model_id.lower() | |
| def _load_ov_hetero_pipeline(self): | |
| print("Loading Heterogeneous Compute pipeline") | |
| if DEVICE.upper() == "NPU": | |
| device = ["NPU", "NPU", "NPU"] | |
| self.pipeline = OvHcLatentConsistency(self.ov_model_id, device) | |
| else: | |
| self.pipeline = OvHcLatentConsistency(self.ov_model_id) | |
| def _generate_images_hetero_compute( | |
| self, | |
| lcm_diffusion_setting: LCMDiffusionSetting, | |
| ): | |
| print("Using OpenVINO ") | |
| if lcm_diffusion_setting.diffusion_task == DiffusionTask.text_to_image.value: | |
| return [ | |
| self.pipeline.generate( | |
| prompt=lcm_diffusion_setting.prompt, | |
| neg_prompt=lcm_diffusion_setting.negative_prompt, | |
| init_image=None, | |
| strength=1.0, | |
| num_inference_steps=lcm_diffusion_setting.inference_steps, | |
| ) | |
| ] | |
| else: | |
| return [ | |
| self.pipeline.generate( | |
| prompt=lcm_diffusion_setting.prompt, | |
| neg_prompt=lcm_diffusion_setting.negative_prompt, | |
| init_image=lcm_diffusion_setting.init_image, | |
| strength=lcm_diffusion_setting.strength, | |
| num_inference_steps=lcm_diffusion_setting.inference_steps, | |
| ) | |
| ] | |
| def _is_valid_mode( | |
| self, | |
| modes: List, | |
| ) -> bool: | |
| return modes.count(True) == 1 or modes.count(False) == 3 | |
| def _validate_mode( | |
| self, | |
| modes: List, | |
| ) -> None: | |
| if not self._is_valid_mode(modes): | |
| raise ValueError("Invalid mode,delete configs/settings.yaml and retry!") | |
| def init( | |
| self, | |
| device: str = "cpu", | |
| lcm_diffusion_setting: LCMDiffusionSetting = LCMDiffusionSetting(), | |
| ) -> None: | |
| # Mode validation either LCM LoRA or OpenVINO or GGUF | |
| modes = [ | |
| lcm_diffusion_setting.use_gguf_model, | |
| lcm_diffusion_setting.use_openvino, | |
| lcm_diffusion_setting.use_lcm_lora, | |
| ] | |
| self._validate_mode(modes) | |
| self.device = device | |
| self.use_openvino = lcm_diffusion_setting.use_openvino | |
| model_id = lcm_diffusion_setting.lcm_model_id | |
| use_local_model = lcm_diffusion_setting.use_offline_model | |
| use_tiny_auto_encoder = lcm_diffusion_setting.use_tiny_auto_encoder | |
| use_lora = lcm_diffusion_setting.use_lcm_lora | |
| lcm_lora: LCMLora = lcm_diffusion_setting.lcm_lora | |
| token_merging = lcm_diffusion_setting.token_merging | |
| self.ov_model_id = lcm_diffusion_setting.openvino_lcm_model_id | |
| if lcm_diffusion_setting.diffusion_task == DiffusionTask.image_to_image.value: | |
| lcm_diffusion_setting.init_image = resize_pil_image( | |
| lcm_diffusion_setting.init_image, | |
| lcm_diffusion_setting.image_width, | |
| lcm_diffusion_setting.image_height, | |
| ) | |
| if ( | |
| self.pipeline is None | |
| or self.previous_model_id != model_id | |
| or self.previous_use_tae_sd != use_tiny_auto_encoder | |
| or self.previous_lcm_lora_base_id != lcm_lora.base_model_id | |
| or self.previous_lcm_lora_id != lcm_lora.lcm_lora_id | |
| or self.previous_use_lcm_lora != use_lora | |
| or self.previous_ov_model_id != self.ov_model_id | |
| or self.previous_token_merging != token_merging | |
| or self.previous_safety_checker != lcm_diffusion_setting.use_safety_checker | |
| or self.previous_use_openvino != lcm_diffusion_setting.use_openvino | |
| or self.previous_use_gguf_model != lcm_diffusion_setting.use_gguf_model | |
| or self.previous_gguf_model != lcm_diffusion_setting.gguf_model | |
| or ( | |
| self.use_openvino | |
| and ( | |
| self.previous_task_type != lcm_diffusion_setting.diffusion_task | |
| or self.previous_lora != lcm_diffusion_setting.lora | |
| ) | |
| ) | |
| or lcm_diffusion_setting.rebuild_pipeline | |
| ): | |
| if self.use_openvino and is_openvino_device(): | |
| if self.pipeline: | |
| del self.pipeline | |
| self.pipeline = None | |
| gc.collect() | |
| self.is_openvino_init = True | |
| if ( | |
| lcm_diffusion_setting.diffusion_task | |
| == DiffusionTask.text_to_image.value | |
| ): | |
| print( | |
| f"***** Init Text to image (OpenVINO) - {self.ov_model_id} *****" | |
| ) | |
| if "flux" in self.ov_model_id.lower(): | |
| print("Loading OpenVINO Flux pipeline") | |
| self.pipeline = get_flux_pipeline( | |
| self.ov_model_id, | |
| lcm_diffusion_setting.use_tiny_auto_encoder, | |
| ) | |
| elif self._is_hetero_pipeline(): | |
| self._load_ov_hetero_pipeline() | |
| else: | |
| self.pipeline = get_ov_text_to_image_pipeline( | |
| self.ov_model_id, | |
| use_local_model, | |
| ) | |
| elif ( | |
| lcm_diffusion_setting.diffusion_task | |
| == DiffusionTask.image_to_image.value | |
| ): | |
| if not self.pipeline and self._is_hetero_pipeline(): | |
| self._load_ov_hetero_pipeline() | |
| else: | |
| print( | |
| f"***** Image to image (OpenVINO) - {self.ov_model_id} *****" | |
| ) | |
| self.pipeline = get_ov_image_to_image_pipeline( | |
| self.ov_model_id, | |
| use_local_model, | |
| ) | |
| elif lcm_diffusion_setting.use_gguf_model: | |
| model = lcm_diffusion_setting.gguf_model.diffusion_path | |
| print(f"***** Init Text to image (GGUF) - {model} *****") | |
| # if self.pipeline: | |
| # self.pipeline.terminate() | |
| # del self.pipeline | |
| # self.pipeline = None | |
| self._init_gguf_diffusion(lcm_diffusion_setting) | |
| else: | |
| if self.pipeline or self.img_to_img_pipeline: | |
| self.pipeline = None | |
| self.img_to_img_pipeline = None | |
| gc.collect() | |
| controlnet_args = load_controlnet_adapters(lcm_diffusion_setting) | |
| if use_lora: | |
| print( | |
| f"***** Init LCM-LoRA pipeline - {lcm_lora.base_model_id} *****" | |
| ) | |
| self.pipeline = get_lcm_lora_pipeline( | |
| lcm_lora.base_model_id, | |
| lcm_lora.lcm_lora_id, | |
| use_local_model, | |
| torch_data_type=self.torch_data_type, | |
| pipeline_args=controlnet_args, | |
| ) | |
| else: | |
| print(f"***** Init LCM Model pipeline - {model_id} *****") | |
| self.pipeline = get_lcm_model_pipeline( | |
| model_id, | |
| use_local_model, | |
| controlnet_args, | |
| ) | |
| self.img_to_img_pipeline = get_image_to_image_pipeline(self.pipeline) | |
| if tomesd and token_merging > 0.001: | |
| print(f"***** Token Merging: {token_merging} *****") | |
| tomesd.apply_patch(self.pipeline, ratio=token_merging) | |
| tomesd.apply_patch(self.img_to_img_pipeline, ratio=token_merging) | |
| if use_tiny_auto_encoder: | |
| if self.use_openvino and is_openvino_device(): | |
| if self.pipeline.__class__.__name__ != "OVFluxPipeline": | |
| print("Using Tiny Auto Encoder (OpenVINO)") | |
| ov_load_taesd( | |
| self.pipeline, | |
| use_local_model, | |
| ) | |
| else: | |
| print("Using Tiny Auto Encoder") | |
| load_taesd( | |
| self.pipeline, | |
| use_local_model, | |
| self.torch_data_type, | |
| ) | |
| load_taesd( | |
| self.img_to_img_pipeline, | |
| use_local_model, | |
| self.torch_data_type, | |
| ) | |
| if not self.use_openvino and not is_openvino_device(): | |
| self._pipeline_to_device() | |
| if not self._is_hetero_pipeline(): | |
| if ( | |
| lcm_diffusion_setting.diffusion_task | |
| == DiffusionTask.image_to_image.value | |
| and lcm_diffusion_setting.use_openvino | |
| ): | |
| self.pipeline.scheduler = LCMScheduler.from_config( | |
| self.pipeline.scheduler.config, | |
| ) | |
| else: | |
| if not lcm_diffusion_setting.use_gguf_model: | |
| self._update_lcm_scheduler_params() | |
| if use_lora: | |
| self._add_freeu() | |
| self.previous_model_id = model_id | |
| self.previous_ov_model_id = self.ov_model_id | |
| self.previous_use_tae_sd = use_tiny_auto_encoder | |
| self.previous_lcm_lora_base_id = lcm_lora.base_model_id | |
| self.previous_lcm_lora_id = lcm_lora.lcm_lora_id | |
| self.previous_use_lcm_lora = use_lora | |
| self.previous_token_merging = lcm_diffusion_setting.token_merging | |
| self.previous_safety_checker = lcm_diffusion_setting.use_safety_checker | |
| self.previous_use_openvino = lcm_diffusion_setting.use_openvino | |
| self.previous_task_type = lcm_diffusion_setting.diffusion_task | |
| self.previous_lora = lcm_diffusion_setting.lora.model_copy(deep=True) | |
| self.previous_use_gguf_model = lcm_diffusion_setting.use_gguf_model | |
| self.previous_gguf_model = lcm_diffusion_setting.gguf_model.model_copy( | |
| deep=True | |
| ) | |
| lcm_diffusion_setting.rebuild_pipeline = False | |
| if ( | |
| lcm_diffusion_setting.diffusion_task | |
| == DiffusionTask.text_to_image.value | |
| ): | |
| print(f"Pipeline : {self.pipeline}") | |
| elif ( | |
| lcm_diffusion_setting.diffusion_task | |
| == DiffusionTask.image_to_image.value | |
| ): | |
| if self.use_openvino and is_openvino_device(): | |
| print(f"Pipeline : {self.pipeline}") | |
| else: | |
| print(f"Pipeline : {self.img_to_img_pipeline}") | |
| if self.use_openvino: | |
| if lcm_diffusion_setting.lora.enabled: | |
| print("Warning: Lora models not supported on OpenVINO mode") | |
| elif not lcm_diffusion_setting.use_gguf_model: | |
| adapters = self.pipeline.get_active_adapters() | |
| print(f"Active adapters : {adapters}") | |
| def _get_timesteps(self): | |
| time_steps = self.pipeline.scheduler.config.get("timesteps") | |
| time_steps_value = [int(time_steps)] if time_steps else None | |
| return time_steps_value | |
| def generate( | |
| self, | |
| lcm_diffusion_setting: LCMDiffusionSetting, | |
| reshape: bool = False, | |
| ) -> Any: | |
| guidance_scale = lcm_diffusion_setting.guidance_scale | |
| img_to_img_inference_steps = lcm_diffusion_setting.inference_steps | |
| check_step_value = int( | |
| lcm_diffusion_setting.inference_steps * lcm_diffusion_setting.strength | |
| ) | |
| if ( | |
| lcm_diffusion_setting.diffusion_task == DiffusionTask.image_to_image.value | |
| and check_step_value < 1 | |
| ): | |
| img_to_img_inference_steps = ceil(1 / lcm_diffusion_setting.strength) | |
| print( | |
| f"Strength: {lcm_diffusion_setting.strength},{img_to_img_inference_steps}" | |
| ) | |
| pipeline_extra_args = {} | |
| if lcm_diffusion_setting.use_seed: | |
| cur_seed = lcm_diffusion_setting.seed | |
| # for multiple images with a fixed seed, use sequential seeds | |
| seeds = [ | |
| (cur_seed + i) for i in range(lcm_diffusion_setting.number_of_images) | |
| ] | |
| else: | |
| seeds = [ | |
| random.randint(0, 999999999) | |
| for i in range(lcm_diffusion_setting.number_of_images) | |
| ] | |
| if self.use_openvino: | |
| # no support for generators; try at least to ensure reproducible results for single images | |
| np.random.seed(seeds[0]) | |
| if self._is_hetero_pipeline(): | |
| torch.manual_seed(seeds[0]) | |
| lcm_diffusion_setting.seed = seeds[0] | |
| else: | |
| pipeline_extra_args["generator"] = [ | |
| torch.Generator(device=self.device).manual_seed(s) for s in seeds | |
| ] | |
| is_openvino_pipe = lcm_diffusion_setting.use_openvino and is_openvino_device() | |
| if is_openvino_pipe and not self._is_hetero_pipeline(): | |
| print("Using OpenVINO") | |
| if reshape and not self.is_openvino_init: | |
| print("Reshape and compile") | |
| self.pipeline.reshape( | |
| batch_size=-1, | |
| height=lcm_diffusion_setting.image_height, | |
| width=lcm_diffusion_setting.image_width, | |
| num_images_per_prompt=lcm_diffusion_setting.number_of_images, | |
| ) | |
| self.pipeline.compile() | |
| if self.is_openvino_init: | |
| self.is_openvino_init = False | |
| if is_openvino_pipe and self._is_hetero_pipeline(): | |
| return self._generate_images_hetero_compute(lcm_diffusion_setting) | |
| elif lcm_diffusion_setting.use_gguf_model: | |
| return self._generate_images_gguf(lcm_diffusion_setting) | |
| if lcm_diffusion_setting.clip_skip > 1: | |
| # We follow the convention that "CLIP Skip == 2" means "skip | |
| # the last layer", so "CLIP Skip == 1" means "no skipping" | |
| pipeline_extra_args["clip_skip"] = lcm_diffusion_setting.clip_skip - 1 | |
| if not lcm_diffusion_setting.use_safety_checker: | |
| self.pipeline.safety_checker = None | |
| if ( | |
| lcm_diffusion_setting.diffusion_task | |
| == DiffusionTask.image_to_image.value | |
| and not is_openvino_pipe | |
| ): | |
| self.img_to_img_pipeline.safety_checker = None | |
| if ( | |
| not lcm_diffusion_setting.use_lcm_lora | |
| and not lcm_diffusion_setting.use_openvino | |
| and lcm_diffusion_setting.guidance_scale != 1.0 | |
| ): | |
| print("Not using LCM-LoRA so setting guidance_scale 1.0") | |
| guidance_scale = 1.0 | |
| controlnet_args = update_controlnet_arguments(lcm_diffusion_setting) | |
| if lcm_diffusion_setting.use_openvino: | |
| if ( | |
| lcm_diffusion_setting.diffusion_task | |
| == DiffusionTask.text_to_image.value | |
| ): | |
| result_images = self.pipeline( | |
| prompt=lcm_diffusion_setting.prompt, | |
| negative_prompt=lcm_diffusion_setting.negative_prompt, | |
| num_inference_steps=lcm_diffusion_setting.inference_steps, | |
| guidance_scale=guidance_scale, | |
| width=lcm_diffusion_setting.image_width, | |
| height=lcm_diffusion_setting.image_height, | |
| num_images_per_prompt=lcm_diffusion_setting.number_of_images, | |
| ).images | |
| elif ( | |
| lcm_diffusion_setting.diffusion_task | |
| == DiffusionTask.image_to_image.value | |
| ): | |
| result_images = self.pipeline( | |
| image=lcm_diffusion_setting.init_image, | |
| strength=lcm_diffusion_setting.strength, | |
| prompt=lcm_diffusion_setting.prompt, | |
| negative_prompt=lcm_diffusion_setting.negative_prompt, | |
| num_inference_steps=img_to_img_inference_steps * 3, | |
| guidance_scale=guidance_scale, | |
| num_images_per_prompt=lcm_diffusion_setting.number_of_images, | |
| ).images | |
| else: | |
| if ( | |
| lcm_diffusion_setting.diffusion_task | |
| == DiffusionTask.text_to_image.value | |
| ): | |
| result_images = self.pipeline( | |
| prompt=lcm_diffusion_setting.prompt, | |
| negative_prompt=lcm_diffusion_setting.negative_prompt, | |
| num_inference_steps=lcm_diffusion_setting.inference_steps, | |
| guidance_scale=guidance_scale, | |
| width=lcm_diffusion_setting.image_width, | |
| height=lcm_diffusion_setting.image_height, | |
| num_images_per_prompt=lcm_diffusion_setting.number_of_images, | |
| timesteps=self._get_timesteps(), | |
| **pipeline_extra_args, | |
| **controlnet_args, | |
| ).images | |
| elif ( | |
| lcm_diffusion_setting.diffusion_task | |
| == DiffusionTask.image_to_image.value | |
| ): | |
| result_images = self.img_to_img_pipeline( | |
| image=lcm_diffusion_setting.init_image, | |
| strength=lcm_diffusion_setting.strength, | |
| prompt=lcm_diffusion_setting.prompt, | |
| negative_prompt=lcm_diffusion_setting.negative_prompt, | |
| num_inference_steps=img_to_img_inference_steps, | |
| guidance_scale=guidance_scale, | |
| width=lcm_diffusion_setting.image_width, | |
| height=lcm_diffusion_setting.image_height, | |
| num_images_per_prompt=lcm_diffusion_setting.number_of_images, | |
| **pipeline_extra_args, | |
| **controlnet_args, | |
| ).images | |
| for i, seed in enumerate(seeds): | |
| result_images[i].info["image_seed"] = seed | |
| return result_images | |
| def _init_gguf_diffusion( | |
| self, | |
| lcm_diffusion_setting: LCMDiffusionSetting, | |
| ): | |
| config = ModelConfig() | |
| config.model_path = lcm_diffusion_setting.gguf_model.diffusion_path | |
| config.diffusion_model_path = lcm_diffusion_setting.gguf_model.diffusion_path | |
| config.clip_l_path = lcm_diffusion_setting.gguf_model.clip_path | |
| config.t5xxl_path = lcm_diffusion_setting.gguf_model.t5xxl_path | |
| config.vae_path = lcm_diffusion_setting.gguf_model.vae_path | |
| config.n_threads = GGUF_THREADS | |
| print(f"GGUF Threads : {GGUF_THREADS} ") | |
| print("GGUF - Model config") | |
| pprint(lcm_diffusion_setting.gguf_model.model_dump()) | |
| self.pipeline = GGUFDiffusion( | |
| get_app_path(), # Place DLL in fastsdcpu folder | |
| config, | |
| True, | |
| ) | |
| def _generate_images_gguf( | |
| self, | |
| lcm_diffusion_setting: LCMDiffusionSetting, | |
| ): | |
| if lcm_diffusion_setting.diffusion_task == DiffusionTask.text_to_image.value: | |
| t2iconfig = Txt2ImgConfig() | |
| t2iconfig.prompt = lcm_diffusion_setting.prompt | |
| t2iconfig.batch_count = lcm_diffusion_setting.number_of_images | |
| t2iconfig.cfg_scale = lcm_diffusion_setting.guidance_scale | |
| t2iconfig.height = lcm_diffusion_setting.image_height | |
| t2iconfig.width = lcm_diffusion_setting.image_width | |
| t2iconfig.sample_steps = lcm_diffusion_setting.inference_steps | |
| t2iconfig.sample_method = SampleMethod.EULER | |
| if lcm_diffusion_setting.use_seed: | |
| t2iconfig.seed = lcm_diffusion_setting.seed | |
| else: | |
| t2iconfig.seed = -1 | |
| return self.pipeline.generate_text2mg(t2iconfig) | |