Spaces:
Running
Running
import sys | |
import time | |
import execution | |
import impact.impact_server | |
from server import PromptServer | |
from impact.utils import any_typ | |
import impact.core as core | |
import re | |
import nodes | |
import traceback | |
class ImpactCompare: | |
def INPUT_TYPES(cls): | |
return { | |
"required": { | |
"cmp": (['a = b', 'a <> b', 'a > b', 'a < b', 'a >= b', 'a <= b', 'tt', 'ff'],), | |
"a": (any_typ, ), | |
"b": (any_typ, ), | |
}, | |
} | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Logic" | |
RETURN_TYPES = ("BOOLEAN", ) | |
def doit(self, cmp, a, b): | |
if cmp == "a = b": | |
return (a == b, ) | |
elif cmp == "a <> b": | |
return (a != b, ) | |
elif cmp == "a > b": | |
return (a > b, ) | |
elif cmp == "a < b": | |
return (a < b, ) | |
elif cmp == "a >= b": | |
return (a >= b, ) | |
elif cmp == "a <= b": | |
return (a <= b, ) | |
elif cmp == 'tt': | |
return (True, ) | |
else: | |
return (False, ) | |
class ImpactNotEmptySEGS: | |
def INPUT_TYPES(cls): | |
return {"required": {"segs": ("SEGS",)}} | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Logic" | |
RETURN_TYPES = ("BOOLEAN", ) | |
def doit(self, segs): | |
return (segs[1] != [], ) | |
class ImpactConditionalBranch: | |
def INPUT_TYPES(cls): | |
return { | |
"required": { | |
"cond": ("BOOLEAN",), | |
"tt_value": (any_typ,{"lazy": True}), | |
"ff_value": (any_typ,{"lazy": True}), | |
}, | |
} | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Logic" | |
RETURN_TYPES = (any_typ, ) | |
def check_lazy_status(self, cond, tt_value=None, ff_value=None): | |
if cond and tt_value is None: | |
return ["tt_value"] | |
if not cond and ff_value is None: | |
return ["ff_value"] | |
def doit(self, cond, tt_value=None, ff_value=None): | |
if cond: | |
return (tt_value,) | |
else: | |
return (ff_value,) | |
class ImpactConditionalBranchSelMode: | |
def INPUT_TYPES(cls): | |
if not core.is_execution_model_version_supported(): | |
required_inputs = { | |
"cond": ("BOOLEAN",), | |
"sel_mode": ("BOOLEAN", {"default": True, "label_on": "select_on_prompt", "label_off": "select_on_execution"}), | |
} | |
else: | |
required_inputs = { | |
"cond": ("BOOLEAN",), | |
} | |
return { | |
"required": required_inputs, | |
"optional": { | |
"tt_value": (any_typ,), | |
"ff_value": (any_typ,), | |
}, | |
} | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Logic" | |
RETURN_TYPES = (any_typ, ) | |
def doit(self, cond, tt_value=None, ff_value=None, **kwargs): | |
print(f'tt={tt_value is None}\nff={ff_value is None}') | |
if cond: | |
return (tt_value,) | |
else: | |
return (ff_value,) | |
class ImpactConvertDataType: | |
def __init__(self): | |
pass | |
def INPUT_TYPES(cls): | |
return {"required": {"value": (any_typ,)}} | |
RETURN_TYPES = ("STRING", "FLOAT", "INT", "BOOLEAN") | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Logic" | |
def is_number(string): | |
pattern = re.compile(r'^[-+]?[0-9]*\.?[0-9]+$') | |
return bool(pattern.match(string)) | |
def doit(self, value): | |
if self.is_number(str(value)): | |
num = value | |
else: | |
if str.lower(str(value)) != "false": | |
num = 1 | |
else: | |
num = 0 | |
return (str(value), float(num), int(float(num)), bool(float(num)), ) | |
class ImpactIfNone: | |
def __init__(self): | |
pass | |
def INPUT_TYPES(cls): | |
return { | |
"required": {}, | |
"optional": {"signal": (any_typ,), "any_input": (any_typ,), } | |
} | |
RETURN_TYPES = (any_typ, "BOOLEAN") | |
RETURN_NAMES = ("signal_opt", "bool") | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Logic" | |
def doit(self, signal=None, any_input=None): | |
if any_input is None: | |
return (signal, False, ) | |
else: | |
return (signal, True, ) | |
class ImpactLogicalOperators: | |
def INPUT_TYPES(cls): | |
return { | |
"required": { | |
"operator": (['and', 'or', 'xor'],), | |
"bool_a": ("BOOLEAN", {"forceInput": True}), | |
"bool_b": ("BOOLEAN", {"forceInput": True}), | |
}, | |
} | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Logic" | |
RETURN_TYPES = ("BOOLEAN", ) | |
def doit(self, operator, bool_a, bool_b): | |
if operator == "and": | |
return (bool_a and bool_b, ) | |
elif operator == "or": | |
return (bool_a or bool_b, ) | |
else: | |
return (bool_a != bool_b, ) | |
class ImpactConditionalStopIteration: | |
def INPUT_TYPES(cls): | |
return { | |
"required": { "cond": ("BOOLEAN", {"forceInput": True}), }, | |
} | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Logic" | |
RETURN_TYPES = () | |
OUTPUT_NODE = True | |
def doit(self, cond): | |
if cond: | |
PromptServer.instance.send_sync("stop-iteration", {}) | |
return {} | |
class ImpactNeg: | |
def INPUT_TYPES(cls): | |
return { | |
"required": { "value": ("BOOLEAN", {"forceInput": True}), }, | |
} | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Logic" | |
RETURN_TYPES = ("BOOLEAN", ) | |
def doit(self, value): | |
return (not value, ) | |
class ImpactInt: | |
def INPUT_TYPES(cls): | |
return { | |
"required": { | |
"value": ("INT", {"default": 0, "min": 0, "max": sys.maxsize, "step": 1}), | |
}, | |
} | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Logic" | |
RETURN_TYPES = ("INT", ) | |
def doit(self, value): | |
return (value, ) | |
class ImpactFloat: | |
def INPUT_TYPES(cls): | |
return { | |
"required": { | |
"value": ("FLOAT", {"default": 1.0, "min": -3.402823466e+38, "max": 3.402823466e+38}), | |
}, | |
} | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Logic" | |
RETURN_TYPES = ("FLOAT", ) | |
def doit(self, value): | |
return (value, ) | |
class ImpactBoolean: | |
def INPUT_TYPES(cls): | |
return { | |
"required": { | |
"value": ("BOOLEAN", {"default": False}), | |
}, | |
} | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Logic" | |
RETURN_TYPES = ("BOOLEAN", ) | |
def doit(self, value): | |
return (value, ) | |
class ImpactValueSender: | |
def INPUT_TYPES(cls): | |
return {"required": { | |
"value": (any_typ, ), | |
"link_id": ("INT", {"default": 0, "min": 0, "max": sys.maxsize, "step": 1}), | |
}, | |
"optional": { | |
"signal_opt": (any_typ,), | |
} | |
} | |
OUTPUT_NODE = True | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Logic" | |
RETURN_TYPES = (any_typ, ) | |
RETURN_NAMES = ("signal", ) | |
def doit(self, value, link_id=0, signal_opt=None): | |
PromptServer.instance.send_sync("value-send", {"link_id": link_id, "value": value}) | |
return (signal_opt, ) | |
class ImpactIntConstSender: | |
def INPUT_TYPES(cls): | |
return {"required": { | |
"signal": (any_typ, ), | |
"value": ("INT", {"default": 0, "min": 0, "max": sys.maxsize, "step": 1}), | |
"link_id": ("INT", {"default": 0, "min": 0, "max": sys.maxsize, "step": 1}), | |
}, | |
} | |
OUTPUT_NODE = True | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Logic" | |
RETURN_TYPES = () | |
def doit(self, signal, value, link_id=0): | |
PromptServer.instance.send_sync("value-send", {"link_id": link_id, "value": value}) | |
return {} | |
class ImpactValueReceiver: | |
def INPUT_TYPES(cls): | |
return {"required": { | |
"typ": (["STRING", "INT", "FLOAT", "BOOLEAN"], ), | |
"value": ("STRING", {"default": ""}), | |
"link_id": ("INT", {"default": 0, "min": 0, "max": sys.maxsize, "step": 1}), | |
}, | |
} | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Logic" | |
RETURN_TYPES = (any_typ, ) | |
def doit(self, typ, value, link_id=0): | |
if typ == "INT": | |
return (int(value), ) | |
elif typ == "FLOAT": | |
return (float(value), ) | |
elif typ == "BOOLEAN": | |
return (value.lower() == "true", ) | |
else: | |
return (value, ) | |
class ImpactImageInfo: | |
def INPUT_TYPES(cls): | |
return {"required": { | |
"value": ("IMAGE", ), | |
}, | |
} | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Logic/_for_test" | |
RETURN_TYPES = ("INT", "INT", "INT", "INT") | |
RETURN_NAMES = ("batch", "height", "width", "channel") | |
def doit(self, value): | |
return (value.shape[0], value.shape[1], value.shape[2], value.shape[3]) | |
class ImpactLatentInfo: | |
def INPUT_TYPES(cls): | |
return {"required": { | |
"value": ("LATENT", ), | |
}, | |
} | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Logic/_for_test" | |
RETURN_TYPES = ("INT", "INT", "INT", "INT") | |
RETURN_NAMES = ("batch", "height", "width", "channel") | |
def doit(self, value): | |
shape = value['samples'].shape | |
return (shape[0], shape[2] * 8, shape[3] * 8, shape[1]) | |
class ImpactMinMax: | |
def INPUT_TYPES(cls): | |
return {"required": { | |
"mode": ("BOOLEAN", {"default": True, "label_on": "max", "label_off": "min"}), | |
"a": (any_typ,), | |
"b": (any_typ,), | |
}, | |
} | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Logic/_for_test" | |
RETURN_TYPES = ("INT", ) | |
def doit(self, mode, a, b): | |
if mode: | |
return (max(a, b), ) | |
else: | |
return (min(a, b),) | |
class ImpactQueueTrigger: | |
def INPUT_TYPES(cls): | |
return {"required": { | |
"signal": (any_typ,), | |
"mode": ("BOOLEAN", {"default": True, "label_on": "Trigger", "label_off": "Don't trigger"}), | |
} | |
} | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Logic/_for_test" | |
RETURN_TYPES = (any_typ,) | |
RETURN_NAMES = ("signal_opt",) | |
OUTPUT_NODE = True | |
def doit(self, signal, mode): | |
if(mode): | |
PromptServer.instance.send_sync("impact-add-queue", {}) | |
return (signal,) | |
class ImpactQueueTriggerCountdown: | |
def INPUT_TYPES(cls): | |
return {"required": { | |
"count": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}), | |
"total": ("INT", {"default": 10, "min": 1, "max": 0xffffffffffffffff}), | |
"mode": ("BOOLEAN", {"default": True, "label_on": "Trigger", "label_off": "Don't trigger"}), | |
}, | |
"optional": {"signal": (any_typ,),}, | |
"hidden": {"unique_id": "UNIQUE_ID"} | |
} | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Logic/_for_test" | |
RETURN_TYPES = (any_typ, "INT", "INT") | |
RETURN_NAMES = ("signal_opt", "count", "total") | |
OUTPUT_NODE = True | |
def doit(self, count, total, mode, unique_id, signal=None): | |
if (mode): | |
if count < total - 1: | |
PromptServer.instance.send_sync("impact-node-feedback", | |
{"node_id": unique_id, "widget_name": "count", "type": "int", "value": count+1}) | |
PromptServer.instance.send_sync("impact-add-queue", {}) | |
if count >= total - 1: | |
PromptServer.instance.send_sync("impact-node-feedback", | |
{"node_id": unique_id, "widget_name": "count", "type": "int", "value": 0}) | |
return (signal, count, total) | |
class ImpactSetWidgetValue: | |
def INPUT_TYPES(cls): | |
return {"required": { | |
"signal": (any_typ,), | |
"node_id": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}), | |
"widget_name": ("STRING", {"multiline": False}), | |
}, | |
"optional": { | |
"boolean_value": ("BOOLEAN", {"forceInput": True}), | |
"int_value": ("INT", {"forceInput": True}), | |
"float_value": ("FLOAT", {"forceInput": True}), | |
"string_value": ("STRING", {"forceInput": True}), | |
} | |
} | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Logic/_for_test" | |
RETURN_TYPES = (any_typ,) | |
RETURN_NAMES = ("signal_opt",) | |
OUTPUT_NODE = True | |
def doit(self, signal, node_id, widget_name, boolean_value=None, int_value=None, float_value=None, string_value=None, ): | |
kind = None | |
if boolean_value is not None: | |
value = boolean_value | |
kind = "BOOLEAN" | |
elif int_value is not None: | |
value = int_value | |
kind = "INT" | |
elif float_value is not None: | |
value = float_value | |
kind = "FLOAT" | |
elif string_value is not None: | |
value = string_value | |
kind = "STRING" | |
else: | |
value = None | |
if value is not None: | |
PromptServer.instance.send_sync("impact-node-feedback", | |
{"node_id": node_id, "widget_name": widget_name, "type": kind, "value": value}) | |
return (signal,) | |
class ImpactNodeSetMuteState: | |
def INPUT_TYPES(cls): | |
return {"required": { | |
"signal": (any_typ,), | |
"node_id": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}), | |
"set_state": ("BOOLEAN", {"default": True, "label_on": "active", "label_off": "mute"}), | |
} | |
} | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Logic/_for_test" | |
RETURN_TYPES = (any_typ,) | |
RETURN_NAMES = ("signal_opt",) | |
OUTPUT_NODE = True | |
def doit(self, signal, node_id, set_state): | |
PromptServer.instance.send_sync("impact-node-mute-state", {"node_id": node_id, "is_active": set_state}) | |
return (signal,) | |
class ImpactSleep: | |
def INPUT_TYPES(cls): | |
return {"required": { | |
"signal": (any_typ,), | |
"seconds": ("FLOAT", {"default": 0.5, "min": 0, "max": 3600}), | |
} | |
} | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Logic/_for_test" | |
RETURN_TYPES = (any_typ,) | |
RETURN_NAMES = ("signal_opt",) | |
OUTPUT_NODE = True | |
def doit(self, signal, seconds): | |
time.sleep(seconds) | |
return (signal,) | |
error_skip_flag = False | |
try: | |
import cm_global | |
def filter_message(str): | |
global error_skip_flag | |
if "IMPACT-PACK-SIGNAL: STOP CONTROL BRIDGE" in str: | |
return True | |
elif error_skip_flag and "ERROR:root:!!! Exception during processing !!!\n" == str: | |
error_skip_flag = False | |
return True | |
else: | |
return False | |
cm_global.try_call(api='cm.register_message_collapse', f=filter_message) | |
except Exception as e: | |
print(f"[WARN] ComfyUI-Impact-Pack: `ComfyUI` or `ComfyUI-Manager` is an outdated version.") | |
pass | |
def workflow_to_map(workflow): | |
nodes = {} | |
links = {} | |
for link in workflow['links']: | |
links[link[0]] = link[1:] | |
for node in workflow['nodes']: | |
nodes[str(node['id'])] = node | |
return nodes, links | |
class ImpactRemoteBoolean: | |
def INPUT_TYPES(cls): | |
return {"required": { | |
"node_id": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}), | |
"widget_name": ("STRING", {"multiline": False}), | |
"value": ("BOOLEAN", {"default": True, "label_on": "True", "label_off": "False"}), | |
}} | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Logic/_for_test" | |
RETURN_TYPES = () | |
OUTPUT_NODE = True | |
def doit(self, **kwargs): | |
return {} | |
class ImpactRemoteInt: | |
def INPUT_TYPES(cls): | |
return {"required": { | |
"node_id": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}), | |
"widget_name": ("STRING", {"multiline": False}), | |
"value": ("INT", {"default": 0, "min": -0xffffffffffffffff, "max": 0xffffffffffffffff}), | |
}} | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Logic/_for_test" | |
RETURN_TYPES = () | |
OUTPUT_NODE = True | |
def doit(self, **kwargs): | |
return {} | |
class ImpactControlBridge: | |
def INPUT_TYPES(cls): | |
return {"required": { | |
"value": (any_typ,), | |
"mode": ("BOOLEAN", {"default": True, "label_on": "Active", "label_off": "Stop/Mute/Bypass"}), | |
"behavior": (["Stop", "Mute", "Bypass"], ), | |
}, | |
"hidden": {"unique_id": "UNIQUE_ID", "prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"} | |
} | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Logic" | |
RETURN_TYPES = (any_typ,) | |
RETURN_NAMES = ("value",) | |
OUTPUT_NODE = True | |
DESCRIPTION = ("When behavior is Stop and mode is active, the input value is passed directly to the output.\n" | |
"When behavior is Mute/Bypass and mode is active, the node connected to the output is changed to active state.\n" | |
"When behavior is Stop and mode is Stop/Mute/Bypass, the workflow execution of the current node is halted.\n" | |
"When behavior is Mute/Bypass and mode is Stop/Mute/Bypass, the node connected to the output is changed to Mute/Bypass state.") | |
def IS_CHANGED(self, value, mode, behavior="Stop", unique_id=None, prompt=None, extra_pnginfo=None): | |
if behavior == "Stop": | |
return value, mode, behavior | |
else: | |
# NOTE: extra_pnginfo is not populated for IS_CHANGED. | |
# so extra_pnginfo is useless in here | |
try: | |
workflow = core.current_prompt['extra_data']['extra_pnginfo']['workflow'] | |
except: | |
print(f"[Impact Pack] core.current_prompt['extra_data']['extra_pnginfo']['workflow']") | |
return 0 | |
nodes, links = workflow_to_map(workflow) | |
next_nodes = [] | |
for link in nodes[unique_id]['outputs'][0]['links']: | |
node_id = str(links[link][2]) | |
impact.utils.collect_non_reroute_nodes(nodes, links, next_nodes, node_id) | |
return next_nodes | |
def doit(self, value, mode, behavior="Stop", unique_id=None, prompt=None, extra_pnginfo=None): | |
global error_skip_flag | |
if core.is_execution_model_version_supported(): | |
from comfy_execution.graph import ExecutionBlocker | |
else: | |
print("[Impact Pack] ImpactControlBridge: ComfyUI is outdated. The 'Stop' behavior cannot function properly.") | |
if behavior == "Stop": | |
if mode: | |
return (value, ) | |
else: | |
return (ExecutionBlocker(None), ) | |
else: | |
workflow_nodes, links = workflow_to_map(extra_pnginfo['workflow']) | |
active_nodes = [] | |
mute_nodes = [] | |
bypass_nodes = [] | |
for link in workflow_nodes[unique_id]['outputs'][0]['links']: | |
node_id = str(links[link][2]) | |
next_nodes = [] | |
impact.utils.collect_non_reroute_nodes(workflow_nodes, links, next_nodes, node_id) | |
for next_node_id in next_nodes: | |
node_mode = workflow_nodes[next_node_id]['mode'] | |
if node_mode == 0: | |
active_nodes.append(next_node_id) | |
elif node_mode == 2: | |
mute_nodes.append(next_node_id) | |
elif node_mode == 4: | |
bypass_nodes.append(next_node_id) | |
if mode: | |
# active | |
should_be_active_nodes = mute_nodes + bypass_nodes | |
if len(should_be_active_nodes) > 0: | |
PromptServer.instance.send_sync("impact-bridge-continue", {"node_id": unique_id, 'actives': list(should_be_active_nodes)}) | |
nodes.interrupt_processing() | |
elif behavior == "Mute" or behavior == True: | |
# mute | |
should_be_mute_nodes = active_nodes + bypass_nodes | |
if len(should_be_mute_nodes) > 0: | |
PromptServer.instance.send_sync("impact-bridge-continue", {"node_id": unique_id, 'mutes': list(should_be_mute_nodes)}) | |
nodes.interrupt_processing() | |
else: | |
# bypass | |
should_be_bypass_nodes = active_nodes + mute_nodes | |
if len(should_be_bypass_nodes) > 0: | |
PromptServer.instance.send_sync("impact-bridge-continue", {"node_id": unique_id, 'bypasses': list(should_be_bypass_nodes)}) | |
nodes.interrupt_processing() | |
return (value, ) | |
class ImpactExecutionOrderController: | |
def INPUT_TYPES(cls): | |
return {"required": { | |
"signal": (any_typ,), | |
"value": (any_typ,), | |
}} | |
FUNCTION = "doit" | |
CATEGORY = "ImpactPack/Util" | |
RETURN_TYPES = (any_typ, any_typ) | |
RETURN_NAMES = ("signal", "value") | |
def doit(self, signal, value): | |
return signal, value | |
original_handle_execution = execution.PromptExecutor.handle_execution_error | |
def handle_execution_error(**kwargs): | |
execution.PromptExecutor.handle_execution_error(**kwargs) | |