Create pipeline_spad.py
Browse filesthis is just an experiment, not the perfect spad, more work to do (fix upcoming bugs)
- pipeline_spad.py +68 -0
pipeline_spad.py
ADDED
@@ -0,0 +1,68 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
import torch
|
2 |
+
from diffusers import AutoencoderKL, DiffusionPipeline
|
3 |
+
from transformers import CLIPTextModel, CLIPTokenizer
|
4 |
+
from mv_unet import SPADUnetModel
|
5 |
+
from diffusers.schedulers import DPMSolverMultistepScheduler
|
6 |
+
|
7 |
+
class SPADPipeline(DiffusionPipeline):
|
8 |
+
def __init__(
|
9 |
+
self,
|
10 |
+
vae: AutoencoderKL,
|
11 |
+
unet: SPADUnetModel,
|
12 |
+
tokenizer: CLIPTokenizer,
|
13 |
+
text_encoder: CLIPTextModel,
|
14 |
+
scheduler: DPMSolverMultistepScheduler,
|
15 |
+
):
|
16 |
+
super().__init__()
|
17 |
+
|
18 |
+
self.vae = vae
|
19 |
+
self.unet = unet
|
20 |
+
self.tokenizer = tokenizer
|
21 |
+
self.text_encoder = text_encoder
|
22 |
+
self.scheduler = scheduler
|
23 |
+
|
24 |
+
# make sure all our models are on the same device
|
25 |
+
self.vae.to(self.device)
|
26 |
+
self.unet.to(self.device)
|
27 |
+
self.text_encoder.to(self.device)
|
28 |
+
|
29 |
+
def encode_prompt(self, prompt, device, num_images_per_prompt, do_classifier_free_guidance, negative_prompt=None):
|
30 |
+
text_input = self.tokenizer(
|
31 |
+
prompt,
|
32 |
+
padding="max_length",
|
33 |
+
max_length=self.tokenizer.model_max_length,
|
34 |
+
return_tensors="pt"
|
35 |
+
)
|
36 |
+
text_embeddings = self.text_encoder(text_input.input_ids.to(device))[0]
|
37 |
+
|
38 |
+
# we duplicate the text embeddings for each generation, just to save time :)
|
39 |
+
bs_embed, seq_len, _ = text_embeddings.shape
|
40 |
+
text_embeddings = text_embeddings.repeat(1, num_images_per_prompt, 1)
|
41 |
+
text_embeddings = text_embeddings.view(bs_embed * num_images_per_prompt, seq_len, -1)
|
42 |
+
|
43 |
+
return text_embeddings
|
44 |
+
|
45 |
+
def __call__(self, prompt, num_inference_steps=50, guidance_scale=7.5):
|
46 |
+
# encide the prompt into the text embeddings
|
47 |
+
text_embeddings = self.encode_prompt(prompt, self.device, 1, do_classifier_free_guidance=False)
|
48 |
+
|
49 |
+
# this is the initial noise sample
|
50 |
+
latents = torch.randn(
|
51 |
+
(text_embeddings.shape[0], self.unet.in_channels, self.unet.image_size, self.unet.image_size),
|
52 |
+
device=self.device
|
53 |
+
)
|
54 |
+
|
55 |
+
# setting up the scheduler
|
56 |
+
self.scheduler.set_timesteps(num_inference_steps)
|
57 |
+
|
58 |
+
# iterate and generate
|
59 |
+
for t in self.scheduler.timesteps:
|
60 |
+
latents = self.scheduler.scale_model_input(latents, t)
|
61 |
+
latents = self.unet(latents, t, text_embeddings)["sample"]
|
62 |
+
latents = self.scheduler.step(latents, t, latents, guidance_scale=guidance_scale)["prev_sample"]
|
63 |
+
|
64 |
+
# decode latents into images
|
65 |
+
images = self.vae.decode(latents)
|
66 |
+
images = (images / 2 + 0.5).clamp(0, 1)
|
67 |
+
|
68 |
+
return images
|