Spaces:
Running
Running
from impact.utils import any_typ, ByPassTypeTuple, make_3d_mask | |
import comfy_extras.nodes_mask | |
from nodes import MAX_RESOLUTION | |
import torch | |
import comfy | |
import sys | |
import nodes | |
import re | |
import impact.core as core | |
from server import PromptServer | |
import inspect | |
class GeneralSwitch: | |
def INPUT_TYPES(s): | |
dyn_inputs = {"input1": (any_typ, {"lazy": True, "tooltip": "Any input. When connected, one more input slot is added."}), } | |
if core.is_execution_model_version_supported(): | |
stack = inspect.stack() | |
if stack[2].function == 'get_input_info': | |
# bypass validation | |
class AllContainer: | |
def __contains__(self, item): | |
return True | |
def __getitem__(self, key): | |
return any_typ, {"lazy": True} | |
dyn_inputs = AllContainer() | |
inputs = {"required": { | |
"select": ("INT", {"default": 1, "min": 1, "max": 999999, "step": 1, "tooltip": "The input number you want to output among the inputs"}), | |
"sel_mode": ("BOOLEAN", {"default": False, "label_on": "select_on_prompt", "label_off": "select_on_execution", "forceInput": False, | |
"tooltip": "In the case of 'select_on_execution', the selection is dynamically determined at the time of workflow execution. 'select_on_prompt' is an option that exists for older versions of ComfyUI, and it makes the decision before the workflow execution."}), | |
}, | |
"optional": dyn_inputs, | |
"hidden": {"unique_id": "UNIQUE_ID", "extra_pnginfo": "EXTRA_PNGINFO"} | |
} | |
return inputs | |
RETURN_TYPES = (any_typ, "STRING", "INT") | |
RETURN_NAMES = ("selected_value", "selected_label", "selected_index") | |
OUTPUT_TOOLTIPS = ("Output is generated only from the input chosen by the 'select' value.", "Slot label of the selected input slot", "Outputs the select value as is") | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Util" | |
def check_lazy_status(self, *args, **kwargs): | |
selected_index = int(kwargs['select']) | |
input_name = f"input{selected_index}" | |
print(f"SELECTED: {input_name}") | |
return [input_name] | |
def doit(*args, **kwargs): | |
selected_index = int(kwargs['select']) | |
input_name = f"input{selected_index}" | |
selected_label = input_name | |
node_id = kwargs['unique_id'] | |
if 'extra_pnginfo' in kwargs and kwargs['extra_pnginfo'] is not None: | |
nodelist = kwargs['extra_pnginfo']['workflow']['nodes'] | |
for node in nodelist: | |
if str(node['id']) == node_id: | |
inputs = node['inputs'] | |
for slot in inputs: | |
if slot['name'] == input_name and 'label' in slot: | |
selected_label = slot['label'] | |
break | |
else: | |
print(f"[Impact-Pack] The switch node does not guarantee proper functioning in API mode.") | |
if input_name in kwargs: | |
return kwargs[input_name], selected_label, selected_index | |
else: | |
print(f"ImpactSwitch: invalid select index (ignored)") | |
return None, "", selected_index | |
class LatentSwitch: | |
def INPUT_TYPES(s): | |
return {"required": { | |
"select": ("INT", {"default": 1, "min": 1, "max": 99999, "step": 1}), | |
"latent1": ("LATENT",), | |
}, | |
} | |
RETURN_TYPES = ("LATENT", ) | |
OUTPUT_NODE = True | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Util" | |
def doit(self, *args, **kwargs): | |
input_name = f"latent{int(kwargs['select'])}" | |
if input_name in kwargs: | |
return (kwargs[input_name],) | |
else: | |
print(f"LatentSwitch: invalid select index ('latent1' is selected)") | |
return (kwargs['latent1'],) | |
class ImageMaskSwitch: | |
def INPUT_TYPES(s): | |
return {"required": { | |
"select": ("INT", {"default": 1, "min": 1, "max": 4, "step": 1}), | |
"images1": ("IMAGE",), | |
}, | |
"optional": { | |
"mask1_opt": ("MASK",), | |
"images2_opt": ("IMAGE",), | |
"mask2_opt": ("MASK",), | |
"images3_opt": ("IMAGE",), | |
"mask3_opt": ("MASK",), | |
"images4_opt": ("IMAGE",), | |
"mask4_opt": ("MASK",), | |
}, | |
} | |
RETURN_TYPES = ("IMAGE", "MASK",) | |
OUTPUT_NODE = True | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Util" | |
def doit(self, select, images1, mask1_opt=None, images2_opt=None, mask2_opt=None, images3_opt=None, mask3_opt=None, | |
images4_opt=None, mask4_opt=None): | |
if select == 1: | |
return images1, mask1_opt, | |
elif select == 2: | |
return images2_opt, mask2_opt, | |
elif select == 3: | |
return images3_opt, mask3_opt, | |
else: | |
return images4_opt, mask4_opt, | |
class GeneralInversedSwitch: | |
def INPUT_TYPES(s): | |
return {"required": { | |
"select": ("INT", {"default": 1, "min": 1, "max": 999999, "step": 1, "tooltip": "The output number you want to send from the input"}), | |
"input": (any_typ, {"tooltip": "Any input. When connected, one more input slot is added."}), | |
}, | |
"optional": { | |
"sel_mode": ("BOOLEAN", {"default": False, "label_on": "select_on_prompt", "label_off": "select_on_execution", "forceInput": False, | |
"tooltip": "In the case of 'select_on_execution', the selection is dynamically determined at the time of workflow execution. 'select_on_prompt' is an option that exists for older versions of ComfyUI, and it makes the decision before the workflow execution."}), | |
}, | |
"hidden": {"prompt": "PROMPT", "unique_id": "UNIQUE_ID"}, | |
} | |
RETURN_TYPES = ByPassTypeTuple((any_typ, )) | |
OUTPUT_TOOLTIPS = ("Output occurs only from the output selected by the 'select' value.\nWhen slots are connected, additional slots are created.", ) | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Util" | |
def doit(self, select, prompt, unique_id, input, **kwargs): | |
if core.is_execution_model_version_supported(): | |
from comfy_execution.graph import ExecutionBlocker | |
else: | |
print("[Impact Pack] InversedSwitch: ComfyUI is outdated. The 'select_on_execution' mode cannot function properly.") | |
res = [] | |
# search max output count in prompt | |
cnt = 0 | |
for x in prompt.values(): | |
for y in x.get('inputs', {}).values(): | |
if isinstance(y, list) and len(y) == 2: | |
if y[0] == unique_id: | |
cnt = max(cnt, y[1]) | |
for i in range(0, cnt + 1): | |
if select == i+1: | |
res.append(input) | |
elif core.is_execution_model_version_supported(): | |
res.append(ExecutionBlocker(None)) | |
else: | |
res.append(None) | |
return res | |
class RemoveNoiseMask: | |
def INPUT_TYPES(s): | |
return {"required": {"samples": ("LATENT",)}} | |
RETURN_TYPES = ("LATENT",) | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Util" | |
def doit(self, samples): | |
res = {key: value for key, value in samples.items() if key != 'noise_mask'} | |
return (res, ) | |
class ImagePasteMasked: | |
def INPUT_TYPES(s): | |
return { | |
"required": { | |
"destination": ("IMAGE",), | |
"source": ("IMAGE",), | |
"x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 1}), | |
"y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 1}), | |
"resize_source": ("BOOLEAN", {"default": False}), | |
}, | |
"optional": { | |
"mask": ("MASK",), | |
} | |
} | |
RETURN_TYPES = ("IMAGE",) | |
FUNCTION = "composite" | |
CATEGORY = "image" | |
def composite(self, destination, source, x, y, resize_source, mask = None): | |
destination = destination.clone().movedim(-1, 1) | |
output = comfy_extras.nodes_mask.composite(destination, source.movedim(-1, 1), x, y, mask, 1, resize_source).movedim(1, -1) | |
return (output,) | |
from impact.utils import any_typ | |
class ImpactLogger: | |
def INPUT_TYPES(s): | |
return {"required": { | |
"data": (any_typ,), | |
"text": ("STRING", {"multiline": True}), | |
}, | |
"hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO", "unique_id": "UNIQUE_ID"}, | |
} | |
CATEGORY = "ImpactPack/Debug" | |
OUTPUT_NODE = True | |
RETURN_TYPES = () | |
FUNCTION = "doit" | |
def doit(self, data, text, prompt, extra_pnginfo, unique_id): | |
shape = "" | |
if hasattr(data, "shape"): | |
shape = f"{data.shape} / " | |
print(f"[IMPACT LOGGER]: {shape}{data}") | |
print(f" PROMPT: {prompt}") | |
# for x in prompt: | |
# if 'inputs' in x and 'populated_text' in x['inputs']: | |
# print(f"PROMPT: {x['10']['inputs']['populated_text']}") | |
# | |
# for x in extra_pnginfo['workflow']['nodes']: | |
# if x['type'] == 'ImpactWildcardProcessor': | |
# print(f" WV : {x['widgets_values'][1]}\n") | |
PromptServer.instance.send_sync("impact-node-feedback", {"node_id": unique_id, "widget_name": "text", "type": "TEXT", "value": f"{data}"}) | |
return {} | |
class ImpactDummyInput: | |
def INPUT_TYPES(s): | |
return {"required": {}} | |
CATEGORY = "ImpactPack/Debug" | |
RETURN_TYPES = (any_typ,) | |
FUNCTION = "doit" | |
def doit(self): | |
return ("DUMMY",) | |
class MasksToMaskList: | |
def INPUT_TYPES(s): | |
return {"required": { | |
"masks": ("MASK", ), | |
} | |
} | |
RETURN_TYPES = ("MASK", ) | |
OUTPUT_IS_LIST = (True, ) | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Operation" | |
def doit(self, masks): | |
if masks is None: | |
empty_mask = torch.zeros((64, 64), dtype=torch.float32, device="cpu") | |
return ([empty_mask], ) | |
res = [] | |
for mask in masks: | |
res.append(mask) | |
print(f"mask len: {len(res)}") | |
res = [make_3d_mask(x) for x in res] | |
return (res, ) | |
class MaskListToMaskBatch: | |
def INPUT_TYPES(s): | |
return {"required": { | |
"mask": ("MASK", ), | |
} | |
} | |
INPUT_IS_LIST = True | |
RETURN_TYPES = ("MASK", ) | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Operation" | |
def doit(self, mask): | |
if len(mask) == 1: | |
mask = make_3d_mask(mask[0]) | |
return (mask,) | |
elif len(mask) > 1: | |
mask1 = make_3d_mask(mask[0]) | |
for mask2 in mask[1:]: | |
mask2 = make_3d_mask(mask2) | |
if mask1.shape[1:] != mask2.shape[1:]: | |
mask2 = comfy.utils.common_upscale(mask2.movedim(-1, 1), mask1.shape[2], mask1.shape[1], "lanczos", "center").movedim(1, -1) | |
mask1 = torch.cat((mask1, mask2), dim=0) | |
return (mask1,) | |
else: | |
empty_mask = torch.zeros((1, 64, 64), dtype=torch.float32, device="cpu").unsqueeze(0) | |
return (empty_mask,) | |
class ImageListToImageBatch: | |
def INPUT_TYPES(s): | |
return {"required": { | |
"images": ("IMAGE", ), | |
} | |
} | |
INPUT_IS_LIST = True | |
RETURN_TYPES = ("IMAGE", ) | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Operation" | |
def doit(self, images): | |
if len(images) <= 1: | |
return (images[0],) | |
else: | |
image1 = images[0] | |
for image2 in images[1:]: | |
if image1.shape[1:] != image2.shape[1:]: | |
image2 = comfy.utils.common_upscale(image2.movedim(-1, 1), image1.shape[2], image1.shape[1], "lanczos", "center").movedim(1, -1) | |
image1 = torch.cat((image1, image2), dim=0) | |
return (image1,) | |
class ImageBatchToImageList: | |
def INPUT_TYPES(s): | |
return {"required": {"image": ("IMAGE",), }} | |
RETURN_TYPES = ("IMAGE",) | |
OUTPUT_IS_LIST = (True,) | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Util" | |
def doit(self, image): | |
images = [image[i:i + 1, ...] for i in range(image.shape[0])] | |
return (images, ) | |
class MakeAnyList: | |
def INPUT_TYPES(s): | |
return { | |
"required": {}, | |
"optional": {"value1": (any_typ,), } | |
} | |
RETURN_TYPES = (any_typ,) | |
OUTPUT_IS_LIST = (True,) | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Util" | |
def doit(self, **kwargs): | |
values = [] | |
for k, v in kwargs.items(): | |
if v is not None: | |
values.append(v) | |
return (values, ) | |
class MakeMaskList: | |
def INPUT_TYPES(s): | |
return {"required": {"mask1": ("MASK",), }} | |
RETURN_TYPES = ("MASK",) | |
OUTPUT_IS_LIST = (True,) | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Util" | |
def doit(self, **kwargs): | |
masks = [] | |
for k, v in kwargs.items(): | |
masks.append(v) | |
return (masks, ) | |
class MakeImageList: | |
def INPUT_TYPES(s): | |
return {"required": {"image1": ("IMAGE",), }} | |
RETURN_TYPES = ("IMAGE",) | |
OUTPUT_IS_LIST = (True,) | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Util" | |
def doit(self, **kwargs): | |
images = [] | |
for k, v in kwargs.items(): | |
images.append(v) | |
return (images, ) | |
class MakeImageBatch: | |
def INPUT_TYPES(s): | |
return {"required": {"image1": ("IMAGE",), }} | |
RETURN_TYPES = ("IMAGE",) | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Util" | |
def doit(self, **kwargs): | |
image1 = kwargs['image1'] | |
del kwargs['image1'] | |
images = [value for value in kwargs.values()] | |
if len(images) == 0: | |
return (image1,) | |
else: | |
for image2 in images: | |
if image1.shape[1:] != image2.shape[1:]: | |
image2 = comfy.utils.common_upscale(image2.movedim(-1, 1), image1.shape[2], image1.shape[1], "lanczos", "center").movedim(1, -1) | |
image1 = torch.cat((image1, image2), dim=0) | |
return (image1,) | |
class MakeMaskBatch: | |
def INPUT_TYPES(s): | |
return {"required": {"mask1": ("MASK",), }} | |
RETURN_TYPES = ("MASK",) | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Util" | |
def doit(self, **kwargs): | |
mask1 = kwargs['mask1'] | |
del kwargs['mask1'] | |
masks = [utils.make_3d_mask(value) for value in kwargs.values()] | |
if len(masks) == 0: | |
return (mask1,) | |
else: | |
for mask2 in masks: | |
if mask1.shape[1:] != mask2.shape[1:]: | |
mask2 = comfy.utils.common_upscale(mask2.movedim(-1, 1), mask1.shape[2], mask1.shape[1], "lanczos", "center").movedim(1, -1) | |
mask1 = torch.cat((mask1, mask2), dim=0) | |
return (mask1,) | |
class ReencodeLatent: | |
def INPUT_TYPES(s): | |
return {"required": { | |
"samples": ("LATENT", ), | |
"tile_mode": (["None", "Both", "Decode(input) only", "Encode(output) only"],), | |
"input_vae": ("VAE", ), | |
"output_vae": ("VAE", ), | |
"tile_size": ("INT", {"default": 512, "min": 320, "max": 4096, "step": 64}), | |
}, | |
} | |
CATEGORY = "ImpactPack/Util" | |
RETURN_TYPES = ("LATENT", ) | |
FUNCTION = "doit" | |
def doit(self, samples, tile_mode, input_vae, output_vae, tile_size=512): | |
if tile_mode in ["Both", "Decode(input) only"]: | |
pixels = nodes.VAEDecodeTiled().decode(input_vae, samples, tile_size)[0] | |
else: | |
pixels = nodes.VAEDecode().decode(input_vae, samples)[0] | |
if tile_mode in ["Both", "Encode(output) only"]: | |
return nodes.VAEEncodeTiled().encode(output_vae, pixels, tile_size) | |
else: | |
return nodes.VAEEncode().encode(output_vae, pixels) | |
class ReencodeLatentPipe: | |
def INPUT_TYPES(s): | |
return {"required": { | |
"samples": ("LATENT", ), | |
"tile_mode": (["None", "Both", "Decode(input) only", "Encode(output) only"],), | |
"input_basic_pipe": ("BASIC_PIPE", ), | |
"output_basic_pipe": ("BASIC_PIPE", ), | |
}, | |
} | |
CATEGORY = "ImpactPack/Util" | |
RETURN_TYPES = ("LATENT", ) | |
FUNCTION = "doit" | |
def doit(self, samples, tile_mode, input_basic_pipe, output_basic_pipe): | |
_, _, input_vae, _, _ = input_basic_pipe | |
_, _, output_vae, _, _ = output_basic_pipe | |
return ReencodeLatent().doit(samples, tile_mode, input_vae, output_vae) | |
class StringSelector: | |
def INPUT_TYPES(s): | |
return {"required": { | |
"strings": ("STRING", {"multiline": True}), | |
"multiline": ("BOOLEAN", {"default": False, "label_on": "enabled", "label_off": "disabled"}), | |
"select": ("INT", {"min": 0, "max": sys.maxsize, "step": 1, "default": 0}), | |
}} | |
RETURN_TYPES = ("STRING",) | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Util" | |
def doit(self, strings, multiline, select): | |
lines = strings.split('\n') | |
if multiline: | |
result = [] | |
current_string = "" | |
for line in lines: | |
if line.startswith("#"): | |
if current_string: | |
result.append(current_string.strip()) | |
current_string = "" | |
current_string += line + "\n" | |
if current_string: | |
result.append(current_string.strip()) | |
if len(result) == 0: | |
selected = strings | |
else: | |
selected = result[select % len(result)] | |
if selected.startswith('#'): | |
selected = selected[1:] | |
else: | |
if len(lines) == 0: | |
selected = strings | |
else: | |
selected = lines[select % len(lines)] | |
return (selected, ) | |
class StringListToString: | |
def INPUT_TYPES(s): | |
return { | |
"required": { | |
"join_with": ("STRING", {"default": "\\n"}), | |
"string_list": ("STRING", {"forceInput": True}), | |
} | |
} | |
INPUT_IS_LIST = True | |
RETURN_TYPES = ("STRING",) | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Util" | |
def doit(self, join_with, string_list): | |
# convert \\n to newline character | |
if join_with[0] == "\\n": | |
join_with[0] = "\n" | |
joined_text = join_with[0].join(string_list) | |
return (joined_text,) | |
class WildcardPromptFromString: | |
def INPUT_TYPES(s): | |
return { | |
"required": { | |
"string": ("STRING", {"forceInput": True}), | |
"delimiter": ("STRING", {"multiline": False, "default": "\\n" }), | |
"prefix_all": ("STRING", {"multiline": False}), | |
"postfix_all": ("STRING", {"multiline": False}), | |
"restrict_to_tags": ("STRING", {"multiline": False}), | |
"exclude_tags": ("STRING", {"multiline": False}) | |
}, | |
} | |
RETURN_TYPES = ("STRING", "STRING",) | |
RETURN_NAMES = ("wildcard", "segs_labels",) | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Util" | |
def doit(self, string, delimiter, prefix_all, postfix_all, restrict_to_tags, exclude_tags): | |
# convert \\n to newline character | |
if delimiter == "\\n": | |
delimiter = "\n" | |
# some sanity checks and normalization for later processing | |
if prefix_all is None: | |
prefix_all = "" | |
if postfix_all is None: | |
postfix_all = "" | |
if restrict_to_tags is None: | |
restrict_to_tags = "" | |
if exclude_tags is None: | |
exclude_tags = "" | |
restrict_to_tags = restrict_to_tags.split(", ") | |
exclude_tags = exclude_tags.split(", ") | |
# build the wildcard prompt per list entry | |
output = ["[LAB]"] | |
labels = [] | |
for x in string.split(delimiter): | |
label = str(len(labels) + 1) | |
labels.append(label) | |
x = x.split(", ") | |
# restrict to tags | |
if restrict_to_tags != [""]: | |
x = list(set(x) & set(restrict_to_tags)) | |
# remove tags | |
if exclude_tags != [""]: | |
x = list(set(x) - set(exclude_tags)) | |
# next row: <LABEL> <PREFIX> <TAGS> <POSTFIX> | |
prompt_for_seg = f'[{label}] {prefix_all} {", ".join(x)} {postfix_all}'.strip() | |
output.append(prompt_for_seg) | |
output = "\n".join(output) | |
# clean string: fixup double spaces, commas etc. | |
output = re.sub(r' ,', ',', output) | |
output = re.sub(r' +', ' ', output) | |
output = re.sub(r',,+', ',', output) | |
output = re.sub(r'\n, ', '\n', output) | |
return output, ", ".join(labels) | |