Spaces:
Running
on
Zero
Running
on
Zero
File size: 4,595 Bytes
e368cec 619dcd0 e368cec 944dd2b e368cec 944dd2b e368cec bd0bce1 e368cec bd0bce1 e368cec 619dcd0 2af380b e368cec bd0bce1 2af380b e368cec 5777088 e368cec a071819 e368cec 5777088 e368cec a071819 e368cec bd0bce1 619dcd0 e368cec 2af380b e368cec 5777088 e368cec a071819 e368cec 5777088 e368cec a071819 944dd2b fff5214 944dd2b 8bd6620 a071819 944dd2b 8bd6620 a071819 944dd2b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
import concurrent.futures
import random
import gradio as gr
import requests
import io, base64, json
import spaces
from PIL import Image
from .models import IMAGE_GENERATION_MODELS, IMAGE_EDITION_MODELS, VIDEO_GENERATION_MODELS, load_pipeline
class ModelManager:
def __init__(self):
self.model_ig_list = IMAGE_GENERATION_MODELS
self.model_ie_list = IMAGE_EDITION_MODELS
self.model_vg_list = VIDEO_GENERATION_MODELS
self.loaded_models = {}
def load_model_pipe(self, model_name):
if not model_name in self.loaded_models:
pipe = load_pipeline(model_name)
self.loaded_models[model_name] = pipe
else:
pipe = self.loaded_models[model_name]
return pipe
@spaces.GPU(duration=120)
def generate_image_ig(self, prompt, model_name):
pipe = self.load_model_pipe(model_name)
result = pipe(prompt=prompt)
return result
def generate_image_ig_parallel_anony(self, prompt, model_A, model_B):
if model_A == "" and model_B == "":
model_names = random.sample([model for model in self.model_ig_list], 2)
else:
model_names = [model_A, model_B]
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(self.generate_image_ig, prompt, model) for model in model_names]
results = [future.result() for future in futures]
return results[0], results[1], model_names[0], model_names[1]
def generate_image_ig_parallel(self, prompt, model_A, model_B):
model_names = [model_A, model_B]
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(self.generate_image_ig, prompt, model) for model in model_names]
results = [future.result() for future in futures]
return results[0], results[1]
@spaces.GPU(duration=150)
def generate_image_ie(self, textbox_source, textbox_target, textbox_instruct, source_image, model_name):
pipe = self.load_model_pipe(model_name)
result = pipe(src_image = source_image, src_prompt = textbox_source, target_prompt = textbox_target, instruct_prompt = textbox_instruct)
return result
def generate_image_ie_parallel(self, textbox_source, textbox_target, textbox_instruct, source_image, model_A, model_B):
model_names = [model_A, model_B]
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [
executor.submit(self.generate_image_ie, textbox_source, textbox_target, textbox_instruct, source_image,
model) for model in model_names]
results = [future.result() for future in futures]
return results[0], results[1]
def generate_image_ie_parallel_anony(self, textbox_source, textbox_target, textbox_instruct, source_image, model_A, model_B):
if model_A == "" and model_B == "":
model_names = random.sample([model for model in self.model_ie_list], 2)
else:
model_names = [model_A, model_B]
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(self.generate_image_ie, textbox_source, textbox_target, textbox_instruct, source_image, model) for model in model_names]
results = [future.result() for future in futures]
return results[0], results[1], model_names[0], model_names[1]
@spaces.GPU(duration=180)
def generate_video_vg(self, prompt, model_name):
pipe = self.load_model_pipe(model_name)
result = pipe(prompt=prompt)
return result
def generate_video_vg_parallel_anony(self, prompt, model_A, model_B):
if model_A == "" and model_B == "":
model_names = random.sample([model for model in self.model_vg_list], 2)
else:
model_names = [model_A, model_B]
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(self.generate_video_vg, prompt, model) for model in model_names]
results = [future.result() for future in futures]
return results[0], results[1], model_names[0], model_names[1]
def generate_video_vg_parallel(self, prompt, model_A, model_B):
model_names = [model_A, model_B]
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(self.generate_video_vg, prompt, model) for model in model_names]
results = [future.result() for future in futures]
return results[0], results[1] |