Spaces:
Running
Running
from typing import Any | |
from diffusers import ( | |
DiffusionPipeline, | |
AutoencoderTiny, | |
LCMScheduler, | |
UNet2DConditionModel, | |
) | |
from os import path | |
import torch | |
from backend.models.lcmdiffusion_setting import LCMDiffusionSetting | |
import numpy as np | |
from constants import ( | |
DEVICE, | |
LCM_DEFAULT_MODEL, | |
TAESD_MODEL, | |
TAESDXL_MODEL, | |
TAESD_MODEL_OPENVINO, | |
) | |
from huggingface_hub import model_info | |
from backend.models.lcmdiffusion_setting import LCMLora | |
from backend.device import is_openvino_device | |
if is_openvino_device(): | |
from huggingface_hub import snapshot_download | |
from optimum.intel.openvino.modeling_diffusion import OVModelVaeDecoder, OVBaseModel | |
# from optimum.intel.openvino.modeling_diffusion import OVStableDiffusionPipeline | |
from backend.lcmdiffusion.pipelines.openvino.lcm_ov_pipeline import ( | |
OVStableDiffusionPipeline, | |
) | |
from backend.lcmdiffusion.pipelines.openvino.lcm_scheduler import ( | |
LCMScheduler as OpenVinoLCMscheduler, | |
) | |
class CustomOVModelVaeDecoder(OVModelVaeDecoder): | |
def __init__( | |
self, | |
model, | |
parent_model, | |
ov_config=None, | |
model_dir=None, | |
): | |
super(OVModelVaeDecoder, self).__init__( | |
model, | |
parent_model, | |
ov_config, | |
"vae_decoder", | |
model_dir, | |
) | |
class LCMTextToImage: | |
def __init__( | |
self, | |
device: str = "cpu", | |
) -> None: | |
self.pipeline = None | |
self.use_openvino = False | |
self.device = "" | |
self.previous_model_id = None | |
self.previous_use_tae_sd = False | |
self.previous_use_lcm_lora = False | |
self.torch_data_type = ( | |
torch.float32 if is_openvino_device() or DEVICE == "mps" else torch.float16 | |
) | |
print(f"Torch datatype : {self.torch_data_type}") | |
def _get_lcm_pipeline( | |
self, | |
lcm_model_id: str, | |
base_model_id: str, | |
use_local_model: bool, | |
): | |
pipeline = None | |
unet = UNet2DConditionModel.from_pretrained( | |
lcm_model_id, | |
torch_dtype=torch.float32, | |
local_files_only=use_local_model | |
# resume_download=True, | |
) | |
pipeline = DiffusionPipeline.from_pretrained( | |
base_model_id, | |
unet=unet, | |
torch_dtype=torch.float32, | |
local_files_only=use_local_model | |
# resume_download=True, | |
) | |
pipeline.scheduler = LCMScheduler.from_config(pipeline.scheduler.config) | |
return pipeline | |
def get_tiny_decoder_vae_model(self) -> str: | |
pipeline_class = self.pipeline.__class__.__name__ | |
print(f"Pipeline class : {pipeline_class}") | |
if ( | |
pipeline_class == "LatentConsistencyModelPipeline" | |
or pipeline_class == "StableDiffusionPipeline" | |
): | |
return TAESD_MODEL | |
elif pipeline_class == "StableDiffusionXLPipeline": | |
return TAESDXL_MODEL | |
elif pipeline_class == "OVStableDiffusionPipeline": | |
return TAESD_MODEL_OPENVINO | |
def _get_lcm_model_pipeline( | |
self, | |
model_id: str, | |
use_local_model, | |
): | |
pipeline = None | |
if model_id == LCM_DEFAULT_MODEL: | |
pipeline = DiffusionPipeline.from_pretrained( | |
model_id, | |
local_files_only=use_local_model, | |
) | |
elif model_id == "latent-consistency/lcm-sdxl": | |
pipeline = self._get_lcm_pipeline( | |
model_id, | |
"stabilityai/stable-diffusion-xl-base-1.0", | |
use_local_model, | |
) | |
elif model_id == "latent-consistency/lcm-ssd-1b": | |
pipeline = self._get_lcm_pipeline( | |
model_id, | |
"segmind/SSD-1B", | |
use_local_model, | |
) | |
return pipeline | |
def _get_lcm_lora_pipeline( | |
self, | |
base_model_id: str, | |
lcm_lora_id: str, | |
use_local_model: bool, | |
): | |
pipeline = DiffusionPipeline.from_pretrained( | |
base_model_id, | |
torch_dtype=self.torch_data_type, | |
local_files_only=use_local_model, | |
) | |
pipeline.load_lora_weights( | |
lcm_lora_id, | |
local_files_only=use_local_model, | |
) | |
pipeline.scheduler = LCMScheduler.from_config(pipeline.scheduler.config) | |
pipeline.fuse_lora() | |
pipeline.unet.to(memory_format=torch.channels_last) | |
return pipeline | |
def _pipeline_to_device(self): | |
print(f"Pipeline device : {DEVICE}") | |
print(f"Pipeline dtype : {self.torch_data_type}") | |
self.pipeline.to( | |
torch_device=DEVICE, | |
torch_dtype=self.torch_data_type, | |
) | |
def _add_freeu(self): | |
pipeline_class = self.pipeline.__class__.__name__ | |
if pipeline_class == "StableDiffusionPipeline": | |
print("Add FreeU - SD") | |
self.pipeline.enable_freeu( | |
s1=0.9, | |
s2=0.2, | |
b1=1.2, | |
b2=1.4, | |
) | |
elif pipeline_class == "StableDiffusionXLPipeline": | |
print("Add FreeU - SDXL") | |
self.pipeline.enable_freeu( | |
s1=0.6, | |
s2=0.4, | |
b1=1.1, | |
b2=1.2, | |
) | |
def init( | |
self, | |
model_id: str, | |
use_openvino: bool = False, | |
device: str = "cpu", | |
use_local_model: bool = False, | |
use_tiny_auto_encoder: bool = False, | |
use_lora: bool = False, | |
lcm_lora: LCMLora = LCMLora(), | |
) -> None: | |
self.device = device | |
self.use_openvino = use_openvino | |
print(f"use_openvino {self.use_openvino}") | |
print(f"is_openvino {is_openvino_device()}") | |
if ( | |
self.pipeline is None | |
or self.previous_model_id != model_id | |
or self.previous_use_tae_sd != use_tiny_auto_encoder | |
or self.previous_lcm_lora_base_id != lcm_lora.base_model_id | |
or self.previous_lcm_lora_id != lcm_lora.lcm_lora_id | |
or self.previous_use_lcm_lora != use_lora | |
): | |
if self.use_openvino and is_openvino_device(): | |
if self.pipeline: | |
del self.pipeline | |
self.pipeline = None | |
self.pipeline = OVStableDiffusionPipeline.from_pretrained( | |
model_id, | |
local_files_only=use_local_model, | |
ov_config={"CACHE_DIR": ""}, | |
device=DEVICE.upper(), | |
) | |
if use_tiny_auto_encoder: | |
print("Using Tiny Auto Encoder (OpenVINO)") | |
taesd_dir = snapshot_download( | |
repo_id=self.get_tiny_decoder_vae_model(), | |
local_files_only=use_local_model, | |
) | |
self.pipeline.vae_decoder = CustomOVModelVaeDecoder( | |
model=OVBaseModel.load_model( | |
f"{taesd_dir}/vae_decoder/openvino_model.xml" | |
), | |
parent_model=self.pipeline, | |
model_dir=taesd_dir, | |
) | |
else: | |
if self.pipeline: | |
del self.pipeline | |
self.pipeline = None | |
if use_lora: | |
print("Init LCM-LoRA pipeline") | |
self.pipeline = self._get_lcm_lora_pipeline( | |
lcm_lora.base_model_id, | |
lcm_lora.lcm_lora_id, | |
use_local_model, | |
) | |
else: | |
print("Init LCM Model pipeline") | |
self.pipeline = self._get_lcm_model_pipeline( | |
model_id, | |
use_local_model, | |
) | |
if use_tiny_auto_encoder: | |
vae_model = self.get_tiny_decoder_vae_model() | |
print(f"Using Tiny Auto Encoder {vae_model}") | |
self.pipeline.vae = AutoencoderTiny.from_pretrained( | |
vae_model, | |
torch_dtype=torch.float32, | |
local_files_only=use_local_model, | |
) | |
self._pipeline_to_device() | |
self.previous_model_id = model_id | |
self.previous_use_tae_sd = use_tiny_auto_encoder | |
self.previous_lcm_lora_base_id = lcm_lora.base_model_id | |
self.previous_lcm_lora_id = lcm_lora.lcm_lora_id | |
self.previous_use_lcm_lora = use_lora | |
print(f"Model :{model_id}") | |
print(f"Pipeline : {self.pipeline}") | |
self.pipeline.scheduler = LCMScheduler.from_config( | |
self.pipeline.scheduler.config, | |
beta_start=0.001, | |
beta_end=0.01, | |
) | |
if use_lora: | |
self._add_freeu() | |
def generate( | |
self, | |
lcm_diffusion_setting: LCMDiffusionSetting, | |
reshape: bool = False, | |
) -> Any: | |
guidance_scale = lcm_diffusion_setting.guidance_scale | |
if lcm_diffusion_setting.use_seed: | |
cur_seed = lcm_diffusion_setting.seed | |
if self.use_openvino: | |
np.random.seed(cur_seed) | |
else: | |
torch.manual_seed(cur_seed) | |
if lcm_diffusion_setting.use_openvino and is_openvino_device(): | |
print("Using OpenVINO") | |
if reshape: | |
print("Reshape and compile") | |
self.pipeline.reshape( | |
batch_size=-1, | |
height=lcm_diffusion_setting.image_height, | |
width=lcm_diffusion_setting.image_width, | |
num_images_per_prompt=lcm_diffusion_setting.number_of_images, | |
) | |
self.pipeline.compile() | |
if not lcm_diffusion_setting.use_safety_checker: | |
self.pipeline.safety_checker = None | |
if ( | |
not lcm_diffusion_setting.use_lcm_lora | |
and not lcm_diffusion_setting.use_openvino | |
and lcm_diffusion_setting.guidance_scale != 1.0 | |
): | |
print("Not using LCM-LoRA so setting guidance_scale 1.0") | |
guidance_scale = 1.0 | |
if lcm_diffusion_setting.use_openvino: | |
result_images = self.pipeline( | |
prompt=lcm_diffusion_setting.prompt, | |
negative_prompt=lcm_diffusion_setting.negative_prompt, | |
num_inference_steps=lcm_diffusion_setting.inference_steps, | |
guidance_scale=guidance_scale, | |
width=lcm_diffusion_setting.image_width, | |
height=lcm_diffusion_setting.image_height, | |
num_images_per_prompt=lcm_diffusion_setting.number_of_images, | |
).images | |
else: | |
result_images = self.pipeline( | |
prompt=lcm_diffusion_setting.prompt, | |
negative_prompt=lcm_diffusion_setting.negative_prompt, | |
num_inference_steps=lcm_diffusion_setting.inference_steps, | |
guidance_scale=guidance_scale, | |
width=lcm_diffusion_setting.image_width, | |
height=lcm_diffusion_setting.image_height, | |
num_images_per_prompt=lcm_diffusion_setting.number_of_images, | |
).images | |
return result_images | |