|
import inspect |
|
import numpy as np |
|
|
|
from openvino.runtime import Core |
|
|
|
from transformers import CLIPTokenizer |
|
|
|
from tqdm import tqdm |
|
from huggingface_hub import hf_hub_download |
|
from diffusers import LMSDiscreteScheduler, PNDMScheduler |
|
import cv2 |
|
|
|
|
|
def result(var): |
|
return next(iter(var.values())) |
|
|
|
|
|
class StableDiffusionEngine: |
|
def __init__( |
|
self, |
|
scheduler, |
|
model="bes-dev/stable-diffusion-v1-4-openvino", |
|
tokenizer="openai/clip-vit-large-patch14", |
|
device="CPU" |
|
): |
|
self.tokenizer = CLIPTokenizer.from_pretrained(tokenizer) |
|
self.scheduler = scheduler |
|
|
|
self.core = Core() |
|
|
|
self._text_encoder = self.core.read_model( |
|
hf_hub_download(repo_id=model, filename="text_encoder.xml"), |
|
hf_hub_download(repo_id=model, filename="text_encoder.bin") |
|
) |
|
self.text_encoder = self.core.compile_model(self._text_encoder, device) |
|
|
|
self._unet = self.core.read_model( |
|
hf_hub_download(repo_id=model, filename="unet.xml"), |
|
hf_hub_download(repo_id=model, filename="unet.bin") |
|
) |
|
self.unet = self.core.compile_model(self._unet, device) |
|
self.latent_shape = tuple(self._unet.inputs[0].shape)[1:] |
|
|
|
self._vae_decoder = self.core.read_model( |
|
hf_hub_download(repo_id=model, filename="vae_decoder.xml"), |
|
hf_hub_download(repo_id=model, filename="vae_decoder.bin") |
|
) |
|
self.vae_decoder = self.core.compile_model(self._vae_decoder, device) |
|
|
|
self._vae_encoder = self.core.read_model( |
|
hf_hub_download(repo_id=model, filename="vae_encoder.xml"), |
|
hf_hub_download(repo_id=model, filename="vae_encoder.bin") |
|
) |
|
self.vae_encoder = self.core.compile_model(self._vae_encoder, device) |
|
self.init_image_shape = tuple(self._vae_encoder.inputs[0].shape)[2:] |
|
|
|
def _preprocess_mask(self, mask): |
|
h, w = mask.shape |
|
if h != self.init_image_shape[0] and w != self.init_image_shape[1]: |
|
mask = cv2.resize( |
|
mask, |
|
(self.init_image_shape[1], self.init_image_shape[0]), |
|
interpolation = cv2.INTER_NEAREST |
|
) |
|
mask = cv2.resize( |
|
mask, |
|
(self.init_image_shape[1] // 8, self.init_image_shape[0] // 8), |
|
interpolation = cv2.INTER_NEAREST |
|
) |
|
mask = mask.astype(np.float32) / 255.0 |
|
mask = np.tile(mask, (4, 1, 1)) |
|
mask = mask[None].transpose(0, 1, 2, 3) |
|
mask = 1 - mask |
|
return mask |
|
|
|
def _preprocess_image(self, image): |
|
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) |
|
h, w = image.shape[1:] |
|
if h != self.init_image_shape[0] and w != self.init_image_shape[1]: |
|
image = cv2.resize( |
|
image, |
|
(self.init_image_shape[1], self.init_image_shape[0]), |
|
interpolation=cv2.INTER_LANCZOS4 |
|
) |
|
|
|
image = image.astype(np.float32) / 255.0 |
|
image = 2.0 * image - 1.0 |
|
|
|
image = image[None].transpose(0, 3, 1, 2) |
|
return image |
|
|
|
def _encode_image(self, init_image): |
|
moments = result(self.vae_encoder.infer_new_request({ |
|
"init_image": self._preprocess_image(init_image) |
|
})) |
|
mean, logvar = np.split(moments, 2, axis=1) |
|
std = np.exp(logvar * 0.5) |
|
latent = (mean + std * np.random.randn(*mean.shape)) * 0.18215 |
|
return latent |
|
|
|
def __call__( |
|
self, |
|
prompt, |
|
init_image = None, |
|
mask = None, |
|
strength = 0.5, |
|
num_inference_steps = 32, |
|
guidance_scale = 7.5, |
|
eta = 0.0 |
|
): |
|
|
|
tokens = self.tokenizer( |
|
prompt, |
|
padding="max_length", |
|
max_length=self.tokenizer.model_max_length, |
|
truncation=True |
|
).input_ids |
|
text_embeddings = result( |
|
self.text_encoder.infer_new_request({"tokens": np.array([tokens])}) |
|
) |
|
|
|
|
|
if guidance_scale > 1.0: |
|
tokens_uncond = self.tokenizer( |
|
"", |
|
padding="max_length", |
|
max_length=self.tokenizer.model_max_length, |
|
truncation=True |
|
).input_ids |
|
uncond_embeddings = result( |
|
self.text_encoder.infer_new_request({"tokens": np.array([tokens_uncond])}) |
|
) |
|
text_embeddings = np.concatenate((uncond_embeddings, text_embeddings), axis=0) |
|
|
|
|
|
accepts_offset = "offset" in set(inspect.signature(self.scheduler.set_timesteps).parameters.keys()) |
|
extra_set_kwargs = {} |
|
offset = 0 |
|
if accepts_offset: |
|
offset = 1 |
|
extra_set_kwargs["offset"] = 1 |
|
|
|
self.scheduler.set_timesteps(num_inference_steps, **extra_set_kwargs) |
|
|
|
|
|
if init_image is None: |
|
latents = np.random.randn(*self.latent_shape) |
|
init_timestep = num_inference_steps |
|
else: |
|
init_latents = self._encode_image(init_image) |
|
init_timestep = int(num_inference_steps * strength) + offset |
|
init_timestep = min(init_timestep, num_inference_steps) |
|
timesteps = np.array([[self.scheduler.timesteps[-init_timestep]]]).astype(np.long) |
|
noise = np.random.randn(*self.latent_shape) |
|
latents = self.scheduler.add_noise(init_latents, noise, timesteps)[0] |
|
|
|
if init_image is not None and mask is not None: |
|
mask = self._preprocess_mask(mask) |
|
else: |
|
mask = None |
|
|
|
|
|
if isinstance(self.scheduler, LMSDiscreteScheduler): |
|
latents = latents * self.scheduler.sigmas[0] |
|
|
|
|
|
|
|
|
|
|
|
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys()) |
|
extra_step_kwargs = {} |
|
if accepts_eta: |
|
extra_step_kwargs["eta"] = eta |
|
|
|
t_start = max(num_inference_steps - init_timestep + offset, 0) |
|
for i, t in tqdm(enumerate(self.scheduler.timesteps[t_start:])): |
|
|
|
latent_model_input = np.stack([latents, latents], 0) if guidance_scale > 1.0 else latents[None] |
|
if isinstance(self.scheduler, LMSDiscreteScheduler): |
|
sigma = self.scheduler.sigmas[i] |
|
latent_model_input = latent_model_input / ((sigma**2 + 1) ** 0.5) |
|
|
|
|
|
noise_pred = result(self.unet.infer_new_request({ |
|
"latent_model_input": latent_model_input, |
|
"t": t, |
|
"encoder_hidden_states": text_embeddings |
|
})) |
|
|
|
|
|
if guidance_scale > 1.0: |
|
noise_pred = noise_pred[0] + guidance_scale * (noise_pred[1] - noise_pred[0]) |
|
|
|
|
|
if isinstance(self.scheduler, LMSDiscreteScheduler): |
|
latents = self.scheduler.step(noise_pred, i, latents, **extra_step_kwargs)["prev_sample"] |
|
else: |
|
latents = self.scheduler.step(noise_pred, t, latents, **extra_step_kwargs)["prev_sample"] |
|
|
|
|
|
if mask is not None: |
|
init_latents_proper = self.scheduler.add_noise(init_latents, noise, t) |
|
latents = ((init_latents_proper * mask) + (latents * (1 - mask)))[0] |
|
|
|
image = result(self.vae_decoder.infer_new_request({ |
|
"latents": np.expand_dims(latents, 0) |
|
})) |
|
|
|
|
|
image = (image / 2 + 0.5).clip(0, 1) |
|
image = (image[0].transpose(1, 2, 0)[:, :, ::-1] * 255).astype(np.uint8) |
|
return image |
|
|