superdiff-sd-v1-4 / pipeline.py
mskrt's picture
Update pipeline.py
c325aa7 verified
raw
history blame
10.5 kB
import random
from typing import Callable, Dict, List, Optional
import torch
from diffusers import DiffusionPipeline
from diffusers.configuration_utils import ConfigMixin
class SuperDiffPipeline(DiffusionPipeline, ConfigMixin):
"""SuperDiffPipeline."""
def __init__(self, model: Callable, vae: Callable, text_encoder: Callable, scheduler: Callable, tokenizer: Callable, **kwargs) -> None:
"""__init__.
Parameters
----------
model : Callable
model
vae : Callable
vae
text_encoder : Callable
text_encoder
scheduler : Callable
scheduler
tokenizer : Callable
tokenizer
kwargs :
kwargs
Returns
-------
None
"""
super().__init__()
self.model = model
self.vae = vae
self.text_encoder = text_encoder
self.tokenizer = tokenizer
self.scheduler = scheduler
device = "cuda" if torch.cuda.is_available() else "cpu"
self.vae.to(device)
self.model.to(device)
self.text_encoder.to(device)
self.register_to_config(
vae=vae.__class__.__name__,
scheduler=scheduler.__class__.__name__,
tokenizer=tokenizer.__class__.__name__,
unet=unet.__class__.__name__,
text_encoder=text_encoder.__class__.__name__,
device=device,
batch_size=None,
num_inference_steps=None,
guidance_scale=None,
lift=None,
seed=None,
)
@torch.no_grad
def get_batch(self, latents: Callable, nrow: int, ncol: int) -> Callable:
"""get_batch.
Parameters
----------
latents : Callable
latents
nrow : int
nrow
ncol : int
ncol
Returns
-------
Callable
"""
image = self.vae.decode(
latents / self.vae.config.scaling_factor, return_dict=False
)[0]
image = (image / 2 + 0.5).clamp(0, 1).squeeze()
if len(image.shape) < 4:
image = image.unsqueeze(0)
image = (image.permute(0, 2, 3, 1) * 255).to(torch.uint8)
return image
@torch.no_grad
def get_text_embedding(self, prompt: str) -> Callable:
"""get_text_embedding.
Parameters
----------
prompt : str
prompt
Returns
-------
Callable
"""
text_input = self.tokenizer(
prompt,
padding="max_length",
max_length=self.tokenizer.model_max_length,
truncation=True,
return_tensors="pt",
)
return self.text_encoder(text_input.input_ids.to(self.device))[0]
@torch.no_grad
def get_vel(self, t: float, sigma: float, latents: Callable, embeddings: Callable):
"""get_vel.
Parameters
----------
t : float
t
sigma : float
sigma
latents : Callable
latents
embeddings : Callable
embeddings
"""
def v(_x, _e): return self.model(
_x / ((sigma**2 + 1) ** 0.5), t, encoder_hidden_states=_e
).sample
embeds = torch.cat(embeddings)
latent_input = latents
vel = v(latent_input, embeds)
return vel
def preprocess(
self,
prompt_1: str,
prompt_2: str,
seed: int = None,
num_inference_steps: int = 1000,
batch_size: int = 1,
lift: int = 0.0,
height: int = 512,
width: int = 512,
guidance_scale: int = 7.5,
) -> Callable:
"""preprocess.
Parameters
----------
prompt_1 : str
prompt_1
prompt_2 : str
prompt_2
seed : int
seed
num_inference_steps : int
num_inference_steps
batch_size : int
batch_size
lift : int
lift
height : int
height
width : int
width
guidance_scale : int
guidance_scale
Returns
-------
Callable
"""
# Tokenize the input
self.batch_size = batch_size
self.num_inference_steps = num_inference_steps
self.guidance_scale = guidance_scale
self.lift = lift
self.seed = seed
if self.seed is None:
self.seed = random.randint(0, 2**32 - 1)
obj_prompt = [prompt_1]
bg_prompt = [prompt_2]
obj_embeddings = self.get_text_embedding(obj_prompt * batch_size)
bg_embeddings = self.get_text_embedding(bg_prompt * batch_size)
uncond_embeddings = self.get_text_embedding([""] * batch_size)
generator = torch.cuda.manual_seed(
self.seed
) # Seed generator to create the initial latent noise
latents = torch.randn(
(batch_size, self.model.config.in_channels, height // 8, width // 8),
generator=generator,
device=self.device,
)
latents_og = latents.clone().detach()
latents_uncond_og = latents.clone().detach()
self.scheduler.set_timesteps(num_inference_steps)
latents = latents * self.scheduler.init_noise_sigma
latents_uncond = latents.clone().detach()
return {
"latents": latents,
"obj_embeddings": obj_embeddings,
"uncond_embeddings": uncond_embeddings,
"bg_embeddings": bg_embeddings,
}
def _forward(self, model_inputs: Dict) -> Callable:
"""_forward.
Parameters
----------
model_inputs : Dict
model_inputs
Returns
-------
Callable
"""
latents = model_inputs["latents"]
obj_embeddings = model_inputs["obj_embeddings"]
uncond_embeddings = model_inputs["uncond_embeddings"]
bg_embeddings = model_inputs["bg_embeddings"]
kappa = 0.5 * torch.ones(
(self.num_inference_steps + 1, self.batch_size), device=self.device
)
ll_obj = torch.ones(
(self.num_inference_steps + 1, self.batch_size), device=self.device
)
ll_bg = torch.ones(
(self.num_inference_steps + 1, self.batch_size), device=self.device
)
ll_uncond = torch.ones(
(self.num_inference_steps + 1, self.batch_size), device=self.device
)
with torch.no_grad():
for i, t in enumerate(self.scheduler.timesteps):
dsigma = self.scheduler.sigmas[i +
1] - self.scheduler.sigmas[i]
sigma = self.scheduler.sigmas[i]
vel_obj = self.get_vel(t, sigma, latents, [obj_embeddings])
vel_uncond = self.get_vel(
t, sigma, latents, [uncond_embeddings])
vel_bg = self.get_vel(t, sigma, latents, [bg_embeddings])
noise = torch.sqrt(2 * torch.abs(dsigma) * sigma) * torch.randn_like(
latents
)
dx_ind = (
2
* dsigma
* (vel_uncond + self.guidance_scale * (vel_bg - vel_uncond))
+ noise
)
kappa[i + 1] = (
(torch.abs(dsigma) * (vel_bg - vel_obj) * (vel_bg + vel_obj)).sum(
(1, 2, 3)
)
- (dx_ind * ((vel_obj - vel_bg))).sum((1, 2, 3))
+ sigma * self.lift / self.num_inference_steps
)
kappa[i + 1] /= (
2
* dsigma
* self.guidance_scale
* ((vel_obj - vel_bg) ** 2).sum((1, 2, 3))
)
vf = vel_uncond + self.guidance_scale * (
(vel_bg - vel_uncond)
+ kappa[i + 1][:, None, None, None] * (vel_obj - vel_bg)
)
dx = 2 * dsigma * vf + noise
latents += dx
ll_obj[i + 1] = ll_obj[i] + (
-torch.abs(dsigma) / sigma * (vel_obj) ** 2
- (dx * (vel_obj / sigma))
).sum((1, 2, 3))
ll_bg[i + 1] = ll_bg[i] + (
-torch.abs(dsigma) / sigma * (vel_bg) ** 2 -
(dx * (vel_bg / sigma))
).sum((1, 2, 3))
return latents
def postprocess(self, latents: Callable) -> Callable:
"""postprocess.
Parameters
----------
latents : Callable
latents
Returns
-------
Callable
"""
image = self.get_batch(latents, 1, self.batch_size)
# Ensure the shape is (height, width, 3)
assert image.shape[-1] == 3 # Handle grayscale or invalid shapes
# Convert to uint8 if not already
image = image.to(torch.uint8) # Ensure it's uint8 for PIL
return image
def __call__(
self,
prompt_1: str,
prompt_2: str,
seed: int = None,
num_inference_steps: int = 1000,
batch_size: int = 1,
lift: int = 0.0,
height: int = 512,
width: int = 512,
guidance_scale: int = 7.5,
) -> Callable:
"""__call__.
Parameters
----------
prompt_1 : str
prompt_1
prompt_2 : str
prompt_2
seed : int
seed
num_inference_steps : int
num_inference_steps
batch_size : int
batch_size
lift : int
lift
height : int
height
width : int
width
guidance_scale : int
guidance_scale
Returns
-------
Callable
"""
# Preprocess inputs
model_inputs = self.preprocess(
prompt_1,
prompt_2,
seed,
num_inference_steps,
batch_size,
lift,
height,
width,
guidance_scale,
)
# Forward pass through the pipeline
latents = self._forward(model_inputs)
# Postprocess to generate the final output
images = self.postprocess(latents)
return images