|
from jaa import JaaCore |
|
from roop.utilities import get_device |
|
|
|
|
|
from typing import Any |
|
|
|
version = "4.0.0" |
|
|
|
class ChainImgProcessor(JaaCore): |
|
|
|
def __init__(self): |
|
JaaCore.__init__(self) |
|
|
|
self.processors:dict = { |
|
} |
|
|
|
self.processors_objects:dict[str,list[ChainImgPlugin]] = {} |
|
|
|
self.default_chain = "" |
|
self.init_on_start = "" |
|
|
|
self.inited_processors = [] |
|
|
|
self.is_demo_row_render = False |
|
|
|
def process_plugin_manifest(self, modname, manifest): |
|
|
|
if "img_processor" in manifest: |
|
for cmd in manifest["img_processor"].keys(): |
|
self.processors[cmd] = manifest["img_processor"][cmd] |
|
|
|
return manifest |
|
|
|
def init_with_plugins(self): |
|
self.init_plugins(["core"]) |
|
self.display_init_info() |
|
|
|
|
|
init_on_start_arr = self.init_on_start.split(",") |
|
for proc_id in init_on_start_arr: |
|
self.init_processor(proc_id) |
|
|
|
def run_chain(self, img, params:dict[str,Any] = None, chain:str = None, thread_index:int = 0): |
|
if chain is None: |
|
chain = self.default_chain |
|
if params is None: |
|
params = {} |
|
params["_thread_index"] = thread_index |
|
chain_ar = chain.split(",") |
|
|
|
for proc_id in chain_ar: |
|
if proc_id != "": |
|
if not proc_id in self.inited_processors: |
|
self.init_processor(proc_id) |
|
|
|
|
|
|
|
|
|
if self.is_demo_row_render: |
|
import cv2 |
|
import numpy as np |
|
height, width, channels = img.shape |
|
img_blank = np.zeros((height+30, width*(1+len(chain_ar)), 3), dtype=np.uint8) |
|
img_blank.fill(255) |
|
|
|
y = 30 |
|
x = 0 |
|
img_blank[y:y + height, x:x + width] = img |
|
|
|
|
|
font_scale = 1 |
|
thickness = 2 |
|
|
|
|
|
font_face = cv2.FONT_HERSHEY_SIMPLEX |
|
|
|
cv2.putText(img_blank, "original", (x+4, y-7), font_face, font_scale, (0, 0, 0), thickness) |
|
|
|
|
|
i = 0 |
|
for proc_id in chain_ar: |
|
i += 1 |
|
if proc_id != "": |
|
|
|
y = 30 |
|
img = self.processors_objects[proc_id][thread_index].process(img,params) |
|
if self.is_demo_row_render: |
|
x = width*i |
|
img_blank[y:y + height, x:x + width] = img |
|
cv2.putText(img_blank, proc_id, (x + 4, y - 7), font_face, font_scale, (0, 0, 0), thickness) |
|
|
|
if self.is_demo_row_render: |
|
return img_blank, params |
|
|
|
return img, params |
|
|
|
|
|
def fill_processors_for_thread_chains(self, threads:int = 1, chain:str = None): |
|
if chain is None: |
|
chain = self.default_chain |
|
|
|
chain_ar = chain.split(",") |
|
|
|
for processor_id in chain_ar: |
|
if processor_id != "": |
|
if self.processors_objects.get(processor_id) is None: |
|
self.processors_objects[processor_id] = [] |
|
while len(self.processors_objects[processor_id]) < threads: |
|
self.add_processor_to_list(processor_id) |
|
|
|
def add_processor_to_list(self, processor_id: str): |
|
obj = self.processors[processor_id](self) |
|
obj.init_plugin() |
|
if self.processors_objects.get(processor_id) is None: |
|
self.processors_objects[processor_id] = [] |
|
self.processors_objects[processor_id].append(obj) |
|
def init_processor(self, processor_id: str): |
|
if processor_id == "": |
|
return |
|
|
|
if processor_id in self.inited_processors: |
|
return |
|
|
|
try: |
|
if self.verbose: |
|
self.print_blue("TRY: init processor plugin '{0}'...".format(processor_id)) |
|
self.add_processor_to_list(processor_id) |
|
self.inited_processors.append(processor_id) |
|
if self.verbose: |
|
self.print_blue("SUCCESS: '{0}' initialized!".format(processor_id)) |
|
|
|
except Exception as e: |
|
self.print_error("Error init processor plugin {0}...".format(processor_id), e) |
|
|
|
|
|
def display_init_info(self): |
|
if self.verbose: |
|
print("ChainImgProcessor v{0}:".format(version)) |
|
self.format_print_key_list("processors:", self.processors.keys()) |
|
|
|
def format_print_key_list(self, key:str, value:list): |
|
print(key+": ".join(value)) |
|
|
|
def print_error(self,err_txt,e:Exception = None): |
|
print(err_txt,"red") |
|
|
|
|
|
import traceback |
|
traceback.print_exc() |
|
|
|
def print_red(self,txt): |
|
print(txt) |
|
|
|
def print_blue(self, txt): |
|
print(txt) |
|
|
|
class ChainImgPlugin: |
|
|
|
device = 'cpu' |
|
|
|
def __init__(self, core: ChainImgProcessor): |
|
self.core = core |
|
self.device = get_device() |
|
|
|
def init_plugin(self): |
|
pass |
|
def process(self, img, params:dict): |
|
return img |
|
|
|
_img_processor:ChainImgProcessor = None |
|
def get_single_image_processor() -> ChainImgProcessor: |
|
global _img_processor |
|
if _img_processor is None: |
|
_img_processor = ChainImgProcessor() |
|
_img_processor.init_with_plugins() |
|
return _img_processor |