import random from typing import Callable, Dict, List, Optional import torch from diffusers import DiffusionPipeline from diffusers.configuration_utils import ConfigMixin class SuperDiffPipeline(DiffusionPipeline, ConfigMixin): """SuperDiffPipeline.""" def __init__(self, model: Callable, vae: Callable, text_encoder: Callable, scheduler: Callable, tokenizer: Callable, **kwargs) -> None: """__init__. Parameters ---------- model : Callable model vae : Callable vae text_encoder : Callable text_encoder scheduler : Callable scheduler tokenizer : Callable tokenizer kwargs : kwargs Returns ------- None """ super().__init__() self.model = model self.vae = vae self.text_encoder = text_encoder self.tokenizer = tokenizer self.scheduler = scheduler device = "cuda" if torch.cuda.is_available() else "cpu" self.vae.to(device) self.model.to(device) self.text_encoder.to(device) self.register_to_config( vae=vae.__class__.__name__, scheduler=scheduler.__class__.__name__, tokenizer=tokenizer.__class__.__name__, unet=unet.__class__.__name__, text_encoder=text_encoder.__class__.__name__, device=device, batch_size=None, num_inference_steps=None, guidance_scale=None, lift=None, seed=None, ) @torch.no_grad def get_batch(self, latents: Callable, nrow: int, ncol: int) -> Callable: """get_batch. Parameters ---------- latents : Callable latents nrow : int nrow ncol : int ncol Returns ------- Callable """ image = self.vae.decode( latents / self.vae.config.scaling_factor, return_dict=False )[0] image = (image / 2 + 0.5).clamp(0, 1).squeeze() if len(image.shape) < 4: image = image.unsqueeze(0) image = (image.permute(0, 2, 3, 1) * 255).to(torch.uint8) return image @torch.no_grad def get_text_embedding(self, prompt: str) -> Callable: """get_text_embedding. Parameters ---------- prompt : str prompt Returns ------- Callable """ text_input = self.tokenizer( prompt, padding="max_length", max_length=self.tokenizer.model_max_length, truncation=True, return_tensors="pt", ) return self.text_encoder(text_input.input_ids.to(self.device))[0] @torch.no_grad def get_vel(self, t: float, sigma: float, latents: Callable, embeddings: Callable): """get_vel. Parameters ---------- t : float t sigma : float sigma latents : Callable latents embeddings : Callable embeddings """ def v(_x, _e): return self.model( _x / ((sigma**2 + 1) ** 0.5), t, encoder_hidden_states=_e ).sample embeds = torch.cat(embeddings) latent_input = latents vel = v(latent_input, embeds) return vel def preprocess( self, prompt_1: str, prompt_2: str, seed: int = None, num_inference_steps: int = 1000, batch_size: int = 1, lift: int = 0.0, height: int = 512, width: int = 512, guidance_scale: int = 7.5, ) -> Callable: """preprocess. Parameters ---------- prompt_1 : str prompt_1 prompt_2 : str prompt_2 seed : int seed num_inference_steps : int num_inference_steps batch_size : int batch_size lift : int lift height : int height width : int width guidance_scale : int guidance_scale Returns ------- Callable """ # Tokenize the input self.batch_size = batch_size self.num_inference_steps = num_inference_steps self.guidance_scale = guidance_scale self.lift = lift self.seed = seed if self.seed is None: self.seed = random.randint(0, 2**32 - 1) obj_prompt = [prompt_1] bg_prompt = [prompt_2] obj_embeddings = self.get_text_embedding(obj_prompt * batch_size) bg_embeddings = self.get_text_embedding(bg_prompt * batch_size) uncond_embeddings = self.get_text_embedding([""] * batch_size) generator = torch.cuda.manual_seed( self.seed ) # Seed generator to create the initial latent noise latents = torch.randn( (batch_size, self.model.config.in_channels, height // 8, width // 8), generator=generator, device=self.device, ) latents_og = latents.clone().detach() latents_uncond_og = latents.clone().detach() self.scheduler.set_timesteps(num_inference_steps) latents = latents * self.scheduler.init_noise_sigma latents_uncond = latents.clone().detach() return { "latents": latents, "obj_embeddings": obj_embeddings, "uncond_embeddings": uncond_embeddings, "bg_embeddings": bg_embeddings, } def _forward(self, model_inputs: Dict) -> Callable: """_forward. Parameters ---------- model_inputs : Dict model_inputs Returns ------- Callable """ latents = model_inputs["latents"] obj_embeddings = model_inputs["obj_embeddings"] uncond_embeddings = model_inputs["uncond_embeddings"] bg_embeddings = model_inputs["bg_embeddings"] kappa = 0.5 * torch.ones( (self.num_inference_steps + 1, self.batch_size), device=self.device ) ll_obj = torch.ones( (self.num_inference_steps + 1, self.batch_size), device=self.device ) ll_bg = torch.ones( (self.num_inference_steps + 1, self.batch_size), device=self.device ) ll_uncond = torch.ones( (self.num_inference_steps + 1, self.batch_size), device=self.device ) with torch.no_grad(): for i, t in enumerate(self.scheduler.timesteps): dsigma = self.scheduler.sigmas[i + 1] - self.scheduler.sigmas[i] sigma = self.scheduler.sigmas[i] vel_obj = self.get_vel(t, sigma, latents, [obj_embeddings]) vel_uncond = self.get_vel( t, sigma, latents, [uncond_embeddings]) vel_bg = self.get_vel(t, sigma, latents, [bg_embeddings]) noise = torch.sqrt(2 * torch.abs(dsigma) * sigma) * torch.randn_like( latents ) dx_ind = ( 2 * dsigma * (vel_uncond + self.guidance_scale * (vel_bg - vel_uncond)) + noise ) kappa[i + 1] = ( (torch.abs(dsigma) * (vel_bg - vel_obj) * (vel_bg + vel_obj)).sum( (1, 2, 3) ) - (dx_ind * ((vel_obj - vel_bg))).sum((1, 2, 3)) + sigma * self.lift / self.num_inference_steps ) kappa[i + 1] /= ( 2 * dsigma * self.guidance_scale * ((vel_obj - vel_bg) ** 2).sum((1, 2, 3)) ) vf = vel_uncond + self.guidance_scale * ( (vel_bg - vel_uncond) + kappa[i + 1][:, None, None, None] * (vel_obj - vel_bg) ) dx = 2 * dsigma * vf + noise latents += dx ll_obj[i + 1] = ll_obj[i] + ( -torch.abs(dsigma) / sigma * (vel_obj) ** 2 - (dx * (vel_obj / sigma)) ).sum((1, 2, 3)) ll_bg[i + 1] = ll_bg[i] + ( -torch.abs(dsigma) / sigma * (vel_bg) ** 2 - (dx * (vel_bg / sigma)) ).sum((1, 2, 3)) return latents def postprocess(self, latents: Callable) -> Callable: """postprocess. Parameters ---------- latents : Callable latents Returns ------- Callable """ image = self.get_batch(latents, 1, self.batch_size) # Ensure the shape is (height, width, 3) assert image.shape[-1] == 3 # Handle grayscale or invalid shapes # Convert to uint8 if not already image = image.to(torch.uint8) # Ensure it's uint8 for PIL return image def __call__( self, prompt_1: str, prompt_2: str, seed: int = None, num_inference_steps: int = 1000, batch_size: int = 1, lift: int = 0.0, height: int = 512, width: int = 512, guidance_scale: int = 7.5, ) -> Callable: """__call__. Parameters ---------- prompt_1 : str prompt_1 prompt_2 : str prompt_2 seed : int seed num_inference_steps : int num_inference_steps batch_size : int batch_size lift : int lift height : int height width : int width guidance_scale : int guidance_scale Returns ------- Callable """ # Preprocess inputs model_inputs = self.preprocess( prompt_1, prompt_2, seed, num_inference_steps, batch_size, lift, height, width, guidance_scale, ) # Forward pass through the pipeline latents = self._forward(model_inputs) # Postprocess to generate the final output images = self.postprocess(latents) return images