LVM / demo.py
Emma02's picture
Add application file
a858bb2
import re
from natsort import natsorted
def natural_sort_key(s):
return [int(text) if text.isdigit() else text.lower() for text in re.split('([0-9]+)', s)]
def load_example_image_groups(directory):
example_groups = {}
for subdir in os.listdir(directory):
subdir_path = os.path.join(directory, subdir)
if os.path.isdir(subdir_path):
example_groups[subdir] = []
images = [f for f in os.listdir(subdir_path) if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
images = natsorted(images, key=natural_sort_key) # Natural sorting
for filename in images:
img = Image.open(os.path.join(subdir_path, filename))
example_groups[subdir].append(img)
return example_groups
from io import BytesIO
import gradio as gr
import uvicorn
from fastapi import FastAPI
from PIL import Image
import numpy as np
import mlxu
import os
import re
from natsort import natsorted
from .inference import MultiProcessInferenceModel
FLAGS, _ = mlxu.define_flags_with_default(
host='0.0.0.0',
port=5007,
dtype='float16',
checkpoint='',
torch_devices='',
context_frames=16,
)
def natural_sort_key(s):
return [int(text) if text.isdigit() else text.lower() for text in re.split('([0-9]+)', s)]
def load_example_image_groups(directory):
example_groups = {}
for subdir in os.listdir(directory):
subdir_path = os.path.join(directory, subdir)
if os.path.isdir(subdir_path):
example_groups[subdir] = []
images = [f for f in os.listdir(subdir_path) if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
images = natsorted(images, key=natural_sort_key) # Natural sorting
for filename in images:
img = Image.open(os.path.join(subdir_path, filename))
example_groups[subdir].append(img)
return example_groups
def main(_):
assert FLAGS.checkpoint != ''
model = MultiProcessInferenceModel(
checkpoint=FLAGS.checkpoint,
torch_devices=FLAGS.torch_devices,
dtype=FLAGS.dtype,
context_frames=FLAGS.context_frames,
use_lock=True,
)
checkerboard_r1 = np.concatenate([np.zeros((8, 8, 3)), np.ones((8, 8, 3)), np.zeros((8, 8, 3))], axis=1)
checkerboard_r2 = np.concatenate([np.ones((8, 8, 3)), np.zeros((8, 8, 3)), np.ones((8, 8, 3))], axis=1)
checkerboard = np.concatenate([checkerboard_r1, checkerboard_r2] * 16, axis=0).astype(np.float32)
def generate_images(input_images, n_new_frames, n_candidates, temperature=1.0, top_p=0.9):
assert len(input_images) > 0
input_images = [
np.array(img.convert('RGB').resize((256, 256)), dtype=np.float32) / 255.0
for img in input_images
]
input_images = np.stack(input_images, axis=0)
output_images = model([input_images], n_new_frames, n_candidates, temperature, top_p)[0]
generated_images = []
for candidate in output_images:
concatenated_image = []
for i, img in enumerate(candidate):
concatenated_image.append(img)
if i < len(candidate) - 1:
concatenated_image.append(checkerboard)
generated_images.append(
Image.fromarray(
(np.concatenate(concatenated_image, axis=1) * 255).astype(np.uint8)
)
)
return generated_images
with gr.Blocks(css="""
.small-button {
padding: 5px 10px;
min-width: 80px;
}
.large-gallery img {
width: 100%;
height: auto;
max-height: 150px;
}
""") as demo:
with gr.Column():
image_list = gr.State([])
gr.Markdown('# LVM Demo')
gr.Markdown(f'Serving model: {FLAGS.checkpoint}')
gr.Markdown('## Inputs')
with gr.Row():
upload_drag = gr.File(
type='binary',
file_types=['image'],
file_count='multiple',
)
with gr.Column():
gen_length_slider = gr.Slider(
label='Generation length',
minimum=1,
maximum=32,
value=1,
step=1,
interactive=True,
)
n_candidates_slider = gr.Slider(
label='Number of candidates',
minimum=1,
maximum=10,
value=1,
step=1,
interactive=True,
)
temp_slider = gr.Slider(
label='Temperature',
minimum=0,
maximum=2.0,
value=1.0,
interactive=True,
)
top_p_slider = gr.Slider(
label='Top p',
minimum=0,
maximum=1.0,
value=0.9,
interactive=True,
)
clear_btn = gr.Button(
value='Clear',
elem_classes=['small-button'],
)
generate_btn = gr.Button(
value='Generate',
interactive=False,
elem_classes=['small-button'],
)
input_gallery = gr.Gallery(
columns=7,
rows=1,
object_fit='scale-down',
)
gr.Markdown('## Outputs')
output_gallery = gr.Gallery(
columns=4,
object_fit='scale-down',
)
def upload_image_fn(files, images):
for file in files:
images.append(Image.open(BytesIO(file)))
return {
upload_drag: None,
image_list: images,
input_gallery: images,
generate_btn: gr.update(interactive=True),
}
def clear_fn():
return {
image_list: [],
input_gallery: [],
generate_btn: gr.update(interactive=False),
output_gallery: [],
}
def disable_generate_btn():
return {
generate_btn: gr.update(interactive=False),
}
def generate_fn(images, n_candidates, gen_length, temperature, top_p):
new_images = generate_images(
images,
gen_length,
n_candidates=n_candidates,
temperature=temperature,
top_p=top_p,
)
return {
output_gallery: new_images,
generate_btn: gr.update(interactive=True),
}
upload_drag.upload(
upload_image_fn,
inputs=[upload_drag, image_list],
outputs=[upload_drag, image_list, input_gallery, generate_btn],
)
clear_btn.click(
clear_fn,
inputs=None,
outputs=[image_list, input_gallery, generate_btn, output_gallery],
)
generate_btn.click(
disable_generate_btn,
inputs=None,
outputs=[generate_btn],
).then(
generate_fn,
inputs=[image_list, n_candidates_slider, gen_length_slider, temp_slider, top_p_slider],
outputs=[output_gallery, generate_btn],
)
example_groups = load_example_image_groups('/home/yutongbai/demo_images')
def add_image_group_fn(group_name, images):
new_images = images + example_groups[group_name]
return {
image_list: new_images,
input_gallery: new_images,
generate_btn: gr.update(interactive=True),
}
for group_name, group_images in example_groups.items():
with gr.Row():
with gr.Column(scale=3):
add_button = gr.Button(value=f'Add {group_name}', elem_classes=['small-button'])
with gr.Column(scale=7):
group_gallery = gr.Gallery(
value=[Image.fromarray(np.array(img)) for img in group_images],
columns=5,
rows=1,
object_fit='scale-down',
label=group_name,
elem_classes=['large-gallery'],
)
add_button.click(
add_image_group_fn,
inputs=[gr.State(group_name), image_list],
outputs=[image_list, input_gallery, generate_btn],
)
app = FastAPI()
app = gr.mount_gradio_app(app, demo, '/')
uvicorn.run(app, host=FLAGS.host, port=FLAGS.port)
if __name__ == "__main__":
mlxu.run(main)