Spaces:
Running
on
Zero
Running
on
Zero
import concurrent.futures | |
import random | |
import gradio as gr | |
import requests | |
import io, base64, json | |
import spaces | |
from PIL import Image | |
from .models import IMAGE_GENERATION_MODELS, IMAGE_EDITION_MODELS, load_pipeline | |
class ModelManager: | |
def __init__(self): | |
self.model_ig_list = IMAGE_GENERATION_MODELS | |
self.model_ie_list = IMAGE_EDITION_MODELS | |
self.loaded_models = {} | |
def load_model_pipe(self, model_name): | |
if not model_name in self.loaded_models: | |
pipe = load_pipeline(model_name) | |
self.loaded_models[model_name] = pipe | |
else: | |
pipe = self.loaded_models[model_name] | |
return pipe | |
def _generate_image_ig(self, pipe, prompt, model_name): | |
result = pipe(prompt=prompt) | |
return result | |
def generate_image_ig(self, prompt, model_name): | |
pipe = self.load_model_pipe(model_name) | |
result = self._generate_image_ig(pipe, prompt, model_name) | |
return result | |
def generate_image_ig_parallel_anony(self, prompt, model_A, model_B): | |
if model_A == "" and model_B == "": | |
model_names = random.sample([model for model in self.model_ig_list], 2) | |
else: | |
model_names = [model_A, model_B] | |
results = [] | |
with concurrent.futures.ThreadPoolExecutor() as executor: | |
future_to_result = {executor.submit(self.generate_image_ig, prompt, model): model for model in model_names} | |
for future in concurrent.futures.as_completed(future_to_result): | |
result = future.result() | |
results.append(result) | |
return results[0], results[1], model_names[0], model_names[1] | |
def generate_image_ig_parallel(self, prompt, model_A, model_B): | |
results = [] | |
model_names = [model_A, model_B] | |
with concurrent.futures.ThreadPoolExecutor() as executor: | |
future_to_result = {executor.submit(self.generate_image_ig, prompt, model): model for model in model_names} | |
for future in concurrent.futures.as_completed(future_to_result): | |
result = future.result() | |
results.append(result) | |
return results[0], results[1] | |
def _generate_image_ie(self, pipe, source_image, textbox_source, textbox_target, textbox_instruct): | |
result = pipe(src_image = source_image, src_prompt = textbox_source, target_prompt = textbox_target, instruct_prompt = textbox_instruct) | |
return result | |
def generate_image_ie(self, textbox_source, textbox_target, textbox_instruct, source_image, model_name): | |
pipe = self.load_model_pipe(model_name) | |
result = pipe(pipe, src_image = source_image, src_prompt = textbox_source, target_prompt = textbox_target, instruct_prompt = textbox_instruct) | |
return result | |
def generate_image_ie_parallel(self, textbox_source, textbox_target, textbox_instruct, source_image, model_A, model_B): | |
results = [] | |
model_names = [model_A, model_B] | |
with concurrent.futures.ThreadPoolExecutor() as executor: | |
future_to_result = {executor.submit(self.generate_image_ie, textbox_source, textbox_target, textbox_instruct, source_image, model): model for model in model_names} | |
for future in concurrent.futures.as_completed(future_to_result): | |
result = future.result() | |
results.append(result) | |
return results[0], results[1] | |
def generate_image_ie_parallel_anony(self, textbox_source, textbox_target, textbox_instruct, source_image, model_A, model_B): | |
if model_A == "" and model_B == "": | |
model_names = random.sample([model for model in self.model_ie_list], 2) | |
else: | |
model_names = [model_A, model_B] | |
results = [] | |
# model_names = [model_A, model_B] | |
with concurrent.futures.ThreadPoolExecutor() as executor: | |
future_to_result = {executor.submit(self.generate_image_ie, textbox_source, textbox_target, textbox_instruct, source_image, model): model for model in model_names} | |
for future in concurrent.futures.as_completed(future_to_result): | |
result = future.result() | |
results.append(result) | |
return results[0], results[1], model_names[0], model_names[1] |