# -*- coding: utf-8 -*- """Copy of compose_glide.ipynb Automatically generated by Colaboratory. Original file is located at https://colab.research.google.com/drive/19xx6Nu4FeiGj-TzTUFxBf-15IkeuFx_F """ import gradio as gr import torch as th from composable_diffusion.download import download_model from composable_diffusion.model_creation import create_model_and_diffusion as create_model_and_diffusion_for_clevr from composable_diffusion.model_creation import model_and_diffusion_defaults as model_and_diffusion_defaults_for_clevr from composable_diffusion.composable_stable_diffusion.pipeline_composable_stable_diffusion import \ ComposableStableDiffusionPipeline import os import shutil import time import glob import numpy as np import open3d as o3d import open3d.visualization.rendering as rendering from PIL import Image from tqdm.auto import tqdm from point_e.diffusion.configs import DIFFUSION_CONFIGS, diffusion_from_config from point_e.diffusion.sampler import PointCloudSampler from point_e.models.download import load_checkpoint from point_e.models.configs import MODEL_CONFIGS, model_from_config from point_e.util.pc_to_mesh import marching_cubes_mesh has_cuda = th.cuda.is_available() device = th.device('cpu' if not th.cuda.is_available() else 'cuda') print(has_cuda) # init stable diffusion model pipe = ComposableStableDiffusionPipeline.from_pretrained( "CompVis/stable-diffusion-v1-4", ).to(device) pipe.safety_checker = None # create model for CLEVR Objects clevr_options = model_and_diffusion_defaults_for_clevr() flags = { "image_size": 128, "num_channels": 192, "num_res_blocks": 2, "learn_sigma": True, "use_scale_shift_norm": False, "raw_unet": True, "noise_schedule": "squaredcos_cap_v2", "rescale_learned_sigmas": False, "rescale_timesteps": False, "num_classes": '2', "dataset": "clevr_pos", "use_fp16": has_cuda, "timestep_respacing": '100' } for key, val in flags.items(): clevr_options[key] = val clevr_model, clevr_diffusion = create_model_and_diffusion_for_clevr(**clevr_options) clevr_model.eval() if has_cuda: clevr_model.convert_to_fp16() clevr_model.to(device) clevr_model.load_state_dict(th.load(download_model('clevr_pos'), device)) device = th.device('cpu' if not th.cuda.is_available() else 'cuda') # init stable diffusion model pipe = ComposableStableDiffusionPipeline.from_pretrained( "CompVis/stable-diffusion-v1-4", ).to(device) pipe.safety_checker = None # create model for CLEVR Objects clevr_options = model_and_diffusion_defaults_for_clevr() flags = { "image_size": 128, "num_channels": 192, "num_res_blocks": 2, "learn_sigma": True, "use_scale_shift_norm": False, "raw_unet": True, "noise_schedule": "squaredcos_cap_v2", "rescale_learned_sigmas": False, "rescale_timesteps": False, "num_classes": '2', "dataset": "clevr_pos", "use_fp16": has_cuda, "timestep_respacing": '100' } for key, val in flags.items(): clevr_options[key] = val clevr_model, clevr_diffusion = create_model_and_diffusion_for_clevr(**clevr_options) clevr_model.eval() if has_cuda: clevr_model.convert_to_fp16() clevr_model.to(device) clevr_model.load_state_dict(th.load(download_model('clevr_pos'), device)) print('total clevr_pos parameters', sum(x.numel() for x in clevr_model.parameters())) print('creating base model...') base_name = 'base40M-textvec' base_model = model_from_config(MODEL_CONFIGS[base_name], device) base_model.eval() base_diffusion = diffusion_from_config(DIFFUSION_CONFIGS[base_name]) print('creating upsample model...') upsampler_model = model_from_config(MODEL_CONFIGS['upsample'], device) upsampler_model.eval() upsampler_diffusion = diffusion_from_config(DIFFUSION_CONFIGS['upsample']) print('downloading base checkpoint...') base_model.load_state_dict(load_checkpoint(base_name, device)) print('downloading upsampler checkpoint...') upsampler_model.load_state_dict(load_checkpoint('upsample', device)) print('creating SDF model...') name = 'sdf' model = model_from_config(MODEL_CONFIGS[name], device) model.eval() print('loading SDF model...') model.load_state_dict(load_checkpoint(name, device)) def compose_pointe(prompt, weights): weight_list = [float(x.strip()) for x in weights.split('|')] sampler = PointCloudSampler( device=device, models=[base_model, upsampler_model], diffusions=[base_diffusion, upsampler_diffusion], num_points=[1024, 4096 - 1024], aux_channels=['R', 'G', 'B'], guidance_scale=[weight_list, 0.0], model_kwargs_key_filter=('texts', ''), # Do not condition the upsampler at all ) def generate_pcd(prompt_list): # Produce a sample from the model. samples = None for x in tqdm(sampler.sample_batch_progressive(batch_size=1, model_kwargs=dict(texts=prompt_list))): samples = x return samples def generate_fig(samples): pc = sampler.output_to_point_clouds(samples)[0] return pc # has_cuda = th.cuda.is_available() device = th.device('cpu' if not th.cuda.is_available() else 'cuda') # init stable diffusion model pipe = ComposableStableDiffusionPipeline.from_pretrained( "CompVis/stable-diffusion-v1-4", ).to(device) pipe.safety_checker = None # create model for CLEVR Objects clevr_options = model_and_diffusion_defaults_for_clevr() flags = { "image_size": 128, "num_channels": 192, "num_res_blocks": 2, "learn_sigma": True, "use_scale_shift_norm": False, "raw_unet": True, "noise_schedule": "squaredcos_cap_v2", "rescale_learned_sigmas": False, "rescale_timesteps": False, "num_classes": '2', "dataset": "clevr_pos", "use_fp16": has_cuda, "timestep_respacing": '100' } for key, val in flags.items(): clevr_options[key] = val clevr_model, clevr_diffusion = create_model_and_diffusion_for_clevr(**clevr_options) clevr_model.eval() if has_cuda: clevr_model.convert_to_fp16() clevr_model.to(device) clevr_model.load_state_dict(th.load(download_model('clevr_pos'), device)) device = th.device('cpu' if not th.cuda.is_available() else 'cuda') # init stable diffusion model pipe = ComposableStableDiffusionPipeline.from_pretrained( "CompVis/stable-diffusion-v1-4", ).to(device) pipe.safety_checker = None # create model for CLEVR Objects clevr_options = model_and_diffusion_defaults_for_clevr() flags = { "image_size": 128, "num_channels": 192, "num_res_blocks": 2, "learn_sigma": True, "use_scale_shift_norm": False, "raw_unet": True, "noise_schedule": "squaredcos_cap_v2", "rescale_learned_sigmas": False, "rescale_timesteps": False, "num_classes": '2', "dataset": "clevr_pos", "use_fp16": has_cuda, "timestep_respacing": '100' } for key, val in flags.items(): clevr_options[key] = val clevr_model, clevr_diffusion = create_model_and_diffusion_for_clevr(**clevr_options) clevr_model.eval() if has_cuda: clevr_model.convert_to_fp16() clevr_model.to(device) clevr_model.load_state_dict(th.load(download_model('clevr_pos'), device)) print('total clevr_pos parameters', sum(x.numel() for x in clevr_model.parameters())) print('creating base model...') base_name = 'base40M-textvec' base_model = model_from_config(MODEL_CONFIGS[base_name], device) base_model.eval() base_diffusion = diffusion_from_config(DIFFUSION_CONFIGS[base_name]) print('creating upsample model...') upsampler_model = model_from_config(MODEL_CONFIGS['upsample'], device) upsampler_model.eval() upsampler_diffusion = diffusion_from_config(DIFFUSION_CONFIGS['upsample']) print('downloading base checkpoint...') base_model.load_state_dict(load_checkpoint(base_name, device)) print('downloading upsampler checkpoint...') upsampler_model.load_state_dict(load_checkpoint('upsample', device)) print('creating SDF model...') name = 'sdf' model = model_from_config(MODEL_CONFIGS[name], device) model.eval() print('loading SDF model...') model.load_state_dict(load_checkpoint(name, device)) def compose_pointe(prompt, weights, version): weight_list = [float(x.strip()) for x in weights.split('|')] sampler = PointCloudSampler( device=device, models=[base_model, upsampler_model], diffusions=[base_diffusion, upsampler_diffusion], num_points=[1024, 4096 - 1024], aux_channels=['R', 'G', 'B'], guidance_scale=[weight_list, 0.0], model_kwargs_key_filter=('texts', ''), # Do not condition the upsampler at all ) def generate_pcd(prompt_list): # Produce a sample from the model. samples = None for x in tqdm(sampler.sample_batch_progressive(batch_size=1, model_kwargs=dict(texts=prompt_list))): samples = x return samples def generate_fig(samples): pc = sampler.output_to_point_clouds(samples)[0] return pc def generate_mesh(pc): mesh = marching_cubes_mesh( pc=pc, model=model, batch_size=4096, grid_size=128, # increase to 128 for resolution used in evals progress=True, ) return mesh def generate_video(mesh_path): render = rendering.OffscreenRenderer(640, 480) mesh = o3d.io.read_triangle_mesh(mesh_path) mesh.compute_vertex_normals() mat = o3d.visualization.rendering.MaterialRecord() mat.shader = 'defaultLit' render.scene.camera.look_at([0, 0, 0], [1, 1, 1], [0, 0, 1]) render.scene.add_geometry('mesh', mesh, mat) timestr = time.strftime("%Y%m%d-%H%M%S") os.makedirs(timestr, exist_ok=True) def update_geometry(): render.scene.clear_geometry() render.scene.add_geometry('mesh', mesh, mat) def generate_images(): for i in range(64): # Rotation R = mesh.get_rotation_matrix_from_xyz((0, 0, np.pi / 32)) mesh.rotate(R, center=(0, 0, 0)) # Update geometry update_geometry() img = render.render_to_image() o3d.io.write_image(os.path.join(timestr + "/{:05d}.jpg".format(i)), img, quality=100) time.sleep(0.05) generate_images() image_list = [] for filename in sorted(glob.glob(f'{timestr}/*.jpg')): # assuming gif im = Image.open(filename) image_list.append(im) # remove the folder shutil.rmtree(timestr) return image_list prompt_list = [x.strip() for x in prompt.split("|")] pcd = generate_pcd(prompt_list) pc = generate_fig(pcd) mesh = generate_mesh(pc) timestr = time.strftime("%Y%m%d-%H%M%S") mesh_path = os.path.join(f'{timestr}.ply') with open(mesh_path, 'wb') as f: mesh.write_ply(f) image_frames = generate_video(mesh_path) gif_path = os.path.join(f'{timestr}.gif') image_frames[0].save(gif_path, save_all=True, optimizer=False, duration=5, append_images=image_frames[1:], loop=0) return f'{timestr}.gif' def compose_clevr_objects(prompt, weights, steps): weights = [float(x.strip()) for x in weights.split('|')] weights = th.tensor(weights, device=device).reshape(-1, 1, 1, 1) coordinates = [ [ float(x.split(',')[0].strip()), float(x.split(',')[1].strip())] for x in prompt.split('|') ] coordinates += [[-1, -1]] # add unconditional score label batch_size = 1 clevr_options['timestep_respacing'] = str(int(steps)) _, clevr_diffusion = create_model_and_diffusion_for_clevr(**clevr_options) def model_fn(x_t, ts, **kwargs): half = x_t[:1] combined = th.cat([half] * kwargs['y'].size(0), dim=0) model_out = clevr_model(combined, ts, **kwargs) eps, rest = model_out[:, :3], model_out[:, 3:] masks = kwargs.get('masks') cond_eps = eps[masks] uncond_eps = eps[~masks] half_eps = uncond_eps + (weights * (cond_eps - uncond_eps)).sum(dim=0, keepdims=True) eps = th.cat([half_eps] * x_t.size(0), dim=0) return th.cat([eps, rest], dim=1) def sample(coordinates): masks = [True] * (len(coordinates) - 1) + [False] model_kwargs = dict( y=th.tensor(coordinates, dtype=th.float, device=device), masks=th.tensor(masks, dtype=th.bool, device=device) ) samples = clevr_diffusion.p_sample_loop( model_fn, (len(coordinates), 3, clevr_options["image_size"], clevr_options["image_size"]), device=device, clip_denoised=True, progress=True, model_kwargs=model_kwargs, cond_fn=None, )[:batch_size] return samples samples = sample(coordinates) out_img = samples[0].permute(1, 2, 0) out_img = (out_img + 1) / 2 out_img = (out_img.detach().cpu() * 255.).to(th.uint8) out_img = out_img.numpy() return out_img def stable_diffusion_compose(prompt, steps, weights, seed): generator = th.Generator("cuda").manual_seed(int(seed)) image = pipe(prompt, guidance_scale=7.5, num_inference_steps=steps, weights=weights, generator=generator).images[0] image.save(f'{"_".join(prompt.split())}.png') return image def compose_2D_diffusion(prompt, weights, version, steps, seed): try: with th.no_grad(): if version == 'Stable_Diffusion_1v_4': res = stable_diffusion_compose(prompt, steps, weights, seed) return res else: return compose_clevr_objects(prompt, weights, steps) except Exception as e: return None examples_1 = "A castle in a forest | grainy, fog" examples_3 = '0.1, 0.5 | 0.3, 0.5 | 0.5, 0.5 | 0.7, 0.5 | 0.9, 0.5' examples_5 = 'a white church | lightning in the background' examples_6 = 'mystical trees | A dark magical pond | dark' examples_7 = 'A lake | A mountain | Cherry Blossoms next to the lake' image_examples = [ [examples_6, "7.5 | 7.5 | -7.5", 'Stable_Diffusion_1v_4', 50, 8], [examples_6, "7.5 | 7.5 | 7.5", 'Stable_Diffusion_1v_4', 50, 8], [examples_1, "7.5 | -7.5", 'Stable_Diffusion_1v_4', 50, 0], [examples_7, "7.5 | 7.5 | 7.5", 'Stable_Diffusion_1v_4', 50, 3], [examples_5, "7.5 | 7.5", 'Stable_Diffusion_1v_4', 50, 0], [examples_3, "7.5 | 7.5 | 7.5 | 7.5 | 7.5", 'CLEVR Objects', 100, 0] ] pointe_examples = [["a cake | a house", "7.5 | 7.5", 'Point-E'], ["a green avocado | a chair", "7.5 | 3", 'Point-E'], ["a toilet | a chair", "7 | 5", 'Point-E']] with gr.Blocks() as demo: gr.Markdown( """

Composable Diffusion Models (ECCV 2022) - Project Page

""") gr.Markdown( """
"Mystical trees" AND "A magical pond" AND "Dark"
"Mystical trees" AND "A magical pond" AND NOT "Dark"
"A toilet" AND "A chair"
"A monitor" AND "A brown couch"
""" ) gr.Markdown( """

Compositional visual generation by composing pre-trained diffusion models using compositional operators, AND and NOT.

""") gr.Markdown( """

When composing multiple inputs, please use “|” to separate them

""") gr.Markdown( """

( Note: For composing CLEVR objects, we recommend using x in range [0.1, 0.9] and y in range [0.25, 0.7], since the training dataset labels are in given ranges.)


""") with gr.Row(): with gr.Column(): gr.Markdown( """

Composing natural language descriptions / objects for 2D image generation

""") with gr.Row(): text_input = gr.Textbox(value="mystical trees | A dark magical pond | dark", label="Text to image prompt") weights_input = gr.Textbox(value="7.5 | 7.5 | 7.5", label="Weights") with gr.Row(): seed_input = gr.Number(0, label="Seed") steps_input = gr.Slider(10, 200, value=50, label="Steps") with gr.Row(): model_input = gr.Radio( ['Stable_Diffusion_1v_4', 'CLEVR Objects'], type="value", label='Text to image model', value='Stable_Diffusion_1v_4') image_output = gr.Image() image_button = gr.Button("Generate") img_examples = gr.Examples( examples=image_examples, inputs=[text_input, weights_input, model_input, steps_input, seed_input] ) with gr.Column(): gr.Markdown( """

Composing natural language descriptions for 3D asset generation

""") with gr.Row(): asset_input = gr.Textbox(value="a cake | a house", label="Text to 3D prompt") with gr.Row(): asset_weights = gr.Textbox(value="7.5 | 7.5", label="Weights") with gr.Row(): asset_model = gr.Radio(['Point-E'], type="value", label='Text to 3D model', value='Point-E') asset_output = gr.Image(label='GIF') asset_button = gr.Button("Generate") asset_examples = gr.Examples(examples=pointe_examples, inputs=[asset_input, asset_weights, asset_model]) image_button.click(compose_2D_diffusion, inputs=[text_input, weights_input, model_input, steps_input, seed_input], outputs=image_output) asset_button.click(compose_pointe, inputs=[asset_input, asset_weights, asset_model], outputs=asset_output) if __name__ == "__main__": demo.queue(max_size=5) demo.launch(debug=True)