Spaces:
Runtime error
Runtime error
import torch | |
from fcbh.ldm.modules.diffusionmodules.openaimodel import UNetModel | |
from fcbh.ldm.modules.encoders.noise_aug_modules import CLIPEmbeddingNoiseAugmentation | |
from fcbh.ldm.modules.diffusionmodules.openaimodel import Timestep | |
import fcbh.model_management | |
import fcbh.conds | |
from enum import Enum | |
from . import utils | |
class ModelType(Enum): | |
EPS = 1 | |
V_PREDICTION = 2 | |
from fcbh.model_sampling import EPS, V_PREDICTION, ModelSamplingDiscrete | |
def model_sampling(model_config, model_type): | |
if model_type == ModelType.EPS: | |
c = EPS | |
elif model_type == ModelType.V_PREDICTION: | |
c = V_PREDICTION | |
s = ModelSamplingDiscrete | |
class ModelSampling(s, c): | |
pass | |
return ModelSampling(model_config) | |
class BaseModel(torch.nn.Module): | |
def __init__(self, model_config, model_type=ModelType.EPS, device=None): | |
super().__init__() | |
unet_config = model_config.unet_config | |
self.latent_format = model_config.latent_format | |
self.model_config = model_config | |
if not unet_config.get("disable_unet_model_creation", False): | |
self.diffusion_model = UNetModel(**unet_config, device=device) | |
self.model_type = model_type | |
self.model_sampling = model_sampling(model_config, model_type) | |
self.adm_channels = unet_config.get("adm_in_channels", None) | |
if self.adm_channels is None: | |
self.adm_channels = 0 | |
self.inpaint_model = False | |
print("model_type", model_type.name) | |
print("adm", self.adm_channels) | |
def apply_model(self, x, t, c_concat=None, c_crossattn=None, control=None, transformer_options={}, **kwargs): | |
sigma = t | |
xc = self.model_sampling.calculate_input(sigma, x) | |
if c_concat is not None: | |
xc = torch.cat([xc] + [c_concat], dim=1) | |
context = c_crossattn | |
dtype = self.get_dtype() | |
xc = xc.to(dtype) | |
t = self.model_sampling.timestep(t).float() | |
context = context.to(dtype) | |
extra_conds = {} | |
for o in kwargs: | |
extra = kwargs[o] | |
if hasattr(extra, "to"): | |
extra = extra.to(dtype) | |
extra_conds[o] = extra | |
model_output = self.diffusion_model(xc, t, context=context, control=control, transformer_options=transformer_options, **extra_conds).float() | |
return self.model_sampling.calculate_denoised(sigma, model_output, x) | |
def get_dtype(self): | |
return self.diffusion_model.dtype | |
def is_adm(self): | |
return self.adm_channels > 0 | |
def encode_adm(self, **kwargs): | |
return None | |
def extra_conds(self, **kwargs): | |
out = {} | |
if self.inpaint_model: | |
concat_keys = ("mask", "masked_image") | |
cond_concat = [] | |
denoise_mask = kwargs.get("denoise_mask", None) | |
latent_image = kwargs.get("latent_image", None) | |
noise = kwargs.get("noise", None) | |
device = kwargs["device"] | |
def blank_inpaint_image_like(latent_image): | |
blank_image = torch.ones_like(latent_image) | |
# these are the values for "zero" in pixel space translated to latent space | |
blank_image[:,0] *= 0.8223 | |
blank_image[:,1] *= -0.6876 | |
blank_image[:,2] *= 0.6364 | |
blank_image[:,3] *= 0.1380 | |
return blank_image | |
for ck in concat_keys: | |
if denoise_mask is not None: | |
if ck == "mask": | |
cond_concat.append(denoise_mask[:,:1].to(device)) | |
elif ck == "masked_image": | |
cond_concat.append(latent_image.to(device)) #NOTE: the latent_image should be masked by the mask in pixel space | |
else: | |
if ck == "mask": | |
cond_concat.append(torch.ones_like(noise)[:,:1]) | |
elif ck == "masked_image": | |
cond_concat.append(blank_inpaint_image_like(noise)) | |
data = torch.cat(cond_concat, dim=1) | |
out['c_concat'] = fcbh.conds.CONDNoiseShape(data) | |
adm = self.encode_adm(**kwargs) | |
if adm is not None: | |
out['y'] = fcbh.conds.CONDRegular(adm) | |
return out | |
def load_model_weights(self, sd, unet_prefix=""): | |
to_load = {} | |
keys = list(sd.keys()) | |
for k in keys: | |
if k.startswith(unet_prefix): | |
to_load[k[len(unet_prefix):]] = sd.pop(k) | |
to_load = self.model_config.process_unet_state_dict(to_load) | |
m, u = self.diffusion_model.load_state_dict(to_load, strict=False) | |
if len(m) > 0: | |
print("unet missing:", m) | |
if len(u) > 0: | |
print("unet unexpected:", u) | |
del to_load | |
return self | |
def process_latent_in(self, latent): | |
return self.latent_format.process_in(latent) | |
def process_latent_out(self, latent): | |
return self.latent_format.process_out(latent) | |
def state_dict_for_saving(self, clip_state_dict, vae_state_dict): | |
clip_state_dict = self.model_config.process_clip_state_dict_for_saving(clip_state_dict) | |
unet_sd = self.diffusion_model.state_dict() | |
unet_state_dict = {} | |
for k in unet_sd: | |
unet_state_dict[k] = fcbh.model_management.resolve_lowvram_weight(unet_sd[k], self.diffusion_model, k) | |
unet_state_dict = self.model_config.process_unet_state_dict_for_saving(unet_state_dict) | |
vae_state_dict = self.model_config.process_vae_state_dict_for_saving(vae_state_dict) | |
if self.get_dtype() == torch.float16: | |
clip_state_dict = utils.convert_sd_to(clip_state_dict, torch.float16) | |
vae_state_dict = utils.convert_sd_to(vae_state_dict, torch.float16) | |
if self.model_type == ModelType.V_PREDICTION: | |
unet_state_dict["v_pred"] = torch.tensor([]) | |
return {**unet_state_dict, **vae_state_dict, **clip_state_dict} | |
def set_inpaint(self): | |
self.inpaint_model = True | |
def memory_required(self, input_shape): | |
area = input_shape[0] * input_shape[2] * input_shape[3] | |
if fcbh.model_management.xformers_enabled() or fcbh.model_management.pytorch_attention_flash_attention(): | |
#TODO: this needs to be tweaked | |
return (area / (fcbh.model_management.dtype_size(self.get_dtype()) * 10)) * (1024 * 1024) | |
else: | |
#TODO: this formula might be too aggressive since I tweaked the sub-quad and split algorithms to use less memory. | |
return (((area * 0.6) / 0.9) + 1024) * (1024 * 1024) | |
def unclip_adm(unclip_conditioning, device, noise_augmentor, noise_augment_merge=0.0): | |
adm_inputs = [] | |
weights = [] | |
noise_aug = [] | |
for unclip_cond in unclip_conditioning: | |
for adm_cond in unclip_cond["clip_vision_output"].image_embeds: | |
weight = unclip_cond["strength"] | |
noise_augment = unclip_cond["noise_augmentation"] | |
noise_level = round((noise_augmentor.max_noise_level - 1) * noise_augment) | |
c_adm, noise_level_emb = noise_augmentor(adm_cond.to(device), noise_level=torch.tensor([noise_level], device=device)) | |
adm_out = torch.cat((c_adm, noise_level_emb), 1) * weight | |
weights.append(weight) | |
noise_aug.append(noise_augment) | |
adm_inputs.append(adm_out) | |
if len(noise_aug) > 1: | |
adm_out = torch.stack(adm_inputs).sum(0) | |
noise_augment = noise_augment_merge | |
noise_level = round((noise_augmentor.max_noise_level - 1) * noise_augment) | |
c_adm, noise_level_emb = noise_augmentor(adm_out[:, :noise_augmentor.time_embed.dim], noise_level=torch.tensor([noise_level], device=device)) | |
adm_out = torch.cat((c_adm, noise_level_emb), 1) | |
return adm_out | |
class SD21UNCLIP(BaseModel): | |
def __init__(self, model_config, noise_aug_config, model_type=ModelType.V_PREDICTION, device=None): | |
super().__init__(model_config, model_type, device=device) | |
self.noise_augmentor = CLIPEmbeddingNoiseAugmentation(**noise_aug_config) | |
def encode_adm(self, **kwargs): | |
unclip_conditioning = kwargs.get("unclip_conditioning", None) | |
device = kwargs["device"] | |
if unclip_conditioning is None: | |
return torch.zeros((1, self.adm_channels)) | |
else: | |
return unclip_adm(unclip_conditioning, device, self.noise_augmentor, kwargs.get("unclip_noise_augment_merge", 0.05)) | |
def sdxl_pooled(args, noise_augmentor): | |
if "unclip_conditioning" in args: | |
return unclip_adm(args.get("unclip_conditioning", None), args["device"], noise_augmentor)[:,:1280] | |
else: | |
return args["pooled_output"] | |
class SDXLRefiner(BaseModel): | |
def __init__(self, model_config, model_type=ModelType.EPS, device=None): | |
super().__init__(model_config, model_type, device=device) | |
self.embedder = Timestep(256) | |
self.noise_augmentor = CLIPEmbeddingNoiseAugmentation(**{"noise_schedule_config": {"timesteps": 1000, "beta_schedule": "squaredcos_cap_v2"}, "timestep_dim": 1280}) | |
def encode_adm(self, **kwargs): | |
clip_pooled = sdxl_pooled(kwargs, self.noise_augmentor) | |
width = kwargs.get("width", 768) | |
height = kwargs.get("height", 768) | |
crop_w = kwargs.get("crop_w", 0) | |
crop_h = kwargs.get("crop_h", 0) | |
if kwargs.get("prompt_type", "") == "negative": | |
aesthetic_score = kwargs.get("aesthetic_score", 2.5) | |
else: | |
aesthetic_score = kwargs.get("aesthetic_score", 6) | |
out = [] | |
out.append(self.embedder(torch.Tensor([height]))) | |
out.append(self.embedder(torch.Tensor([width]))) | |
out.append(self.embedder(torch.Tensor([crop_h]))) | |
out.append(self.embedder(torch.Tensor([crop_w]))) | |
out.append(self.embedder(torch.Tensor([aesthetic_score]))) | |
flat = torch.flatten(torch.cat(out)).unsqueeze(dim=0).repeat(clip_pooled.shape[0], 1) | |
return torch.cat((clip_pooled.to(flat.device), flat), dim=1) | |
class SDXL(BaseModel): | |
def __init__(self, model_config, model_type=ModelType.EPS, device=None): | |
super().__init__(model_config, model_type, device=device) | |
self.embedder = Timestep(256) | |
self.noise_augmentor = CLIPEmbeddingNoiseAugmentation(**{"noise_schedule_config": {"timesteps": 1000, "beta_schedule": "squaredcos_cap_v2"}, "timestep_dim": 1280}) | |
def encode_adm(self, **kwargs): | |
clip_pooled = sdxl_pooled(kwargs, self.noise_augmentor) | |
width = kwargs.get("width", 768) | |
height = kwargs.get("height", 768) | |
crop_w = kwargs.get("crop_w", 0) | |
crop_h = kwargs.get("crop_h", 0) | |
target_width = kwargs.get("target_width", width) | |
target_height = kwargs.get("target_height", height) | |
out = [] | |
out.append(self.embedder(torch.Tensor([height]))) | |
out.append(self.embedder(torch.Tensor([width]))) | |
out.append(self.embedder(torch.Tensor([crop_h]))) | |
out.append(self.embedder(torch.Tensor([crop_w]))) | |
out.append(self.embedder(torch.Tensor([target_height]))) | |
out.append(self.embedder(torch.Tensor([target_width]))) | |
flat = torch.flatten(torch.cat(out)).unsqueeze(dim=0).repeat(clip_pooled.shape[0], 1) | |
return torch.cat((clip_pooled.to(flat.device), flat), dim=1) | |