gartajackhats1985's picture
Upload 171 files
c37b2dd verified
raw
history blame
23.1 kB
import sys
import time
import execution
import impact.impact_server
from server import PromptServer
from impact.utils import any_typ
import impact.core as core
import re
import nodes
import traceback
class ImpactCompare:
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"cmp": (['a = b', 'a <> b', 'a > b', 'a < b', 'a >= b', 'a <= b', 'tt', 'ff'],),
"a": (any_typ, ),
"b": (any_typ, ),
},
}
FUNCTION = "doit"
CATEGORY = "ImpactPack/Logic"
RETURN_TYPES = ("BOOLEAN", )
def doit(self, cmp, a, b):
if cmp == "a = b":
return (a == b, )
elif cmp == "a <> b":
return (a != b, )
elif cmp == "a > b":
return (a > b, )
elif cmp == "a < b":
return (a < b, )
elif cmp == "a >= b":
return (a >= b, )
elif cmp == "a <= b":
return (a <= b, )
elif cmp == 'tt':
return (True, )
else:
return (False, )
class ImpactNotEmptySEGS:
@classmethod
def INPUT_TYPES(cls):
return {"required": {"segs": ("SEGS",)}}
FUNCTION = "doit"
CATEGORY = "ImpactPack/Logic"
RETURN_TYPES = ("BOOLEAN", )
def doit(self, segs):
return (segs[1] != [], )
class ImpactConditionalBranch:
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"cond": ("BOOLEAN",),
"tt_value": (any_typ,{"lazy": True}),
"ff_value": (any_typ,{"lazy": True}),
},
}
FUNCTION = "doit"
CATEGORY = "ImpactPack/Logic"
RETURN_TYPES = (any_typ, )
def check_lazy_status(self, cond, tt_value=None, ff_value=None):
if cond and tt_value is None:
return ["tt_value"]
if not cond and ff_value is None:
return ["ff_value"]
def doit(self, cond, tt_value=None, ff_value=None):
if cond:
return (tt_value,)
else:
return (ff_value,)
class ImpactConditionalBranchSelMode:
@classmethod
def INPUT_TYPES(cls):
if not core.is_execution_model_version_supported():
required_inputs = {
"cond": ("BOOLEAN",),
"sel_mode": ("BOOLEAN", {"default": True, "label_on": "select_on_prompt", "label_off": "select_on_execution"}),
}
else:
required_inputs = {
"cond": ("BOOLEAN",),
}
return {
"required": required_inputs,
"optional": {
"tt_value": (any_typ,),
"ff_value": (any_typ,),
},
}
FUNCTION = "doit"
CATEGORY = "ImpactPack/Logic"
RETURN_TYPES = (any_typ, )
def doit(self, cond, tt_value=None, ff_value=None, **kwargs):
print(f'tt={tt_value is None}\nff={ff_value is None}')
if cond:
return (tt_value,)
else:
return (ff_value,)
class ImpactConvertDataType:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {"required": {"value": (any_typ,)}}
RETURN_TYPES = ("STRING", "FLOAT", "INT", "BOOLEAN")
FUNCTION = "doit"
CATEGORY = "ImpactPack/Logic"
@staticmethod
def is_number(string):
pattern = re.compile(r'^[-+]?[0-9]*\.?[0-9]+$')
return bool(pattern.match(string))
def doit(self, value):
if self.is_number(str(value)):
num = value
else:
if str.lower(str(value)) != "false":
num = 1
else:
num = 0
return (str(value), float(num), int(float(num)), bool(float(num)), )
class ImpactIfNone:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {
"required": {},
"optional": {"signal": (any_typ,), "any_input": (any_typ,), }
}
RETURN_TYPES = (any_typ, "BOOLEAN")
RETURN_NAMES = ("signal_opt", "bool")
FUNCTION = "doit"
CATEGORY = "ImpactPack/Logic"
def doit(self, signal=None, any_input=None):
if any_input is None:
return (signal, False, )
else:
return (signal, True, )
class ImpactLogicalOperators:
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"operator": (['and', 'or', 'xor'],),
"bool_a": ("BOOLEAN", {"forceInput": True}),
"bool_b": ("BOOLEAN", {"forceInput": True}),
},
}
FUNCTION = "doit"
CATEGORY = "ImpactPack/Logic"
RETURN_TYPES = ("BOOLEAN", )
def doit(self, operator, bool_a, bool_b):
if operator == "and":
return (bool_a and bool_b, )
elif operator == "or":
return (bool_a or bool_b, )
else:
return (bool_a != bool_b, )
class ImpactConditionalStopIteration:
@classmethod
def INPUT_TYPES(cls):
return {
"required": { "cond": ("BOOLEAN", {"forceInput": True}), },
}
FUNCTION = "doit"
CATEGORY = "ImpactPack/Logic"
RETURN_TYPES = ()
OUTPUT_NODE = True
def doit(self, cond):
if cond:
PromptServer.instance.send_sync("stop-iteration", {})
return {}
class ImpactNeg:
@classmethod
def INPUT_TYPES(cls):
return {
"required": { "value": ("BOOLEAN", {"forceInput": True}), },
}
FUNCTION = "doit"
CATEGORY = "ImpactPack/Logic"
RETURN_TYPES = ("BOOLEAN", )
def doit(self, value):
return (not value, )
class ImpactInt:
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"value": ("INT", {"default": 0, "min": 0, "max": sys.maxsize, "step": 1}),
},
}
FUNCTION = "doit"
CATEGORY = "ImpactPack/Logic"
RETURN_TYPES = ("INT", )
def doit(self, value):
return (value, )
class ImpactFloat:
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"value": ("FLOAT", {"default": 1.0, "min": -3.402823466e+38, "max": 3.402823466e+38}),
},
}
FUNCTION = "doit"
CATEGORY = "ImpactPack/Logic"
RETURN_TYPES = ("FLOAT", )
def doit(self, value):
return (value, )
class ImpactBoolean:
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"value": ("BOOLEAN", {"default": False}),
},
}
FUNCTION = "doit"
CATEGORY = "ImpactPack/Logic"
RETURN_TYPES = ("BOOLEAN", )
def doit(self, value):
return (value, )
class ImpactValueSender:
@classmethod
def INPUT_TYPES(cls):
return {"required": {
"value": (any_typ, ),
"link_id": ("INT", {"default": 0, "min": 0, "max": sys.maxsize, "step": 1}),
},
"optional": {
"signal_opt": (any_typ,),
}
}
OUTPUT_NODE = True
FUNCTION = "doit"
CATEGORY = "ImpactPack/Logic"
RETURN_TYPES = (any_typ, )
RETURN_NAMES = ("signal", )
def doit(self, value, link_id=0, signal_opt=None):
PromptServer.instance.send_sync("value-send", {"link_id": link_id, "value": value})
return (signal_opt, )
class ImpactIntConstSender:
@classmethod
def INPUT_TYPES(cls):
return {"required": {
"signal": (any_typ, ),
"value": ("INT", {"default": 0, "min": 0, "max": sys.maxsize, "step": 1}),
"link_id": ("INT", {"default": 0, "min": 0, "max": sys.maxsize, "step": 1}),
},
}
OUTPUT_NODE = True
FUNCTION = "doit"
CATEGORY = "ImpactPack/Logic"
RETURN_TYPES = ()
def doit(self, signal, value, link_id=0):
PromptServer.instance.send_sync("value-send", {"link_id": link_id, "value": value})
return {}
class ImpactValueReceiver:
@classmethod
def INPUT_TYPES(cls):
return {"required": {
"typ": (["STRING", "INT", "FLOAT", "BOOLEAN"], ),
"value": ("STRING", {"default": ""}),
"link_id": ("INT", {"default": 0, "min": 0, "max": sys.maxsize, "step": 1}),
},
}
FUNCTION = "doit"
CATEGORY = "ImpactPack/Logic"
RETURN_TYPES = (any_typ, )
def doit(self, typ, value, link_id=0):
if typ == "INT":
return (int(value), )
elif typ == "FLOAT":
return (float(value), )
elif typ == "BOOLEAN":
return (value.lower() == "true", )
else:
return (value, )
class ImpactImageInfo:
@classmethod
def INPUT_TYPES(cls):
return {"required": {
"value": ("IMAGE", ),
},
}
FUNCTION = "doit"
CATEGORY = "ImpactPack/Logic/_for_test"
RETURN_TYPES = ("INT", "INT", "INT", "INT")
RETURN_NAMES = ("batch", "height", "width", "channel")
def doit(self, value):
return (value.shape[0], value.shape[1], value.shape[2], value.shape[3])
class ImpactLatentInfo:
@classmethod
def INPUT_TYPES(cls):
return {"required": {
"value": ("LATENT", ),
},
}
FUNCTION = "doit"
CATEGORY = "ImpactPack/Logic/_for_test"
RETURN_TYPES = ("INT", "INT", "INT", "INT")
RETURN_NAMES = ("batch", "height", "width", "channel")
def doit(self, value):
shape = value['samples'].shape
return (shape[0], shape[2] * 8, shape[3] * 8, shape[1])
class ImpactMinMax:
@classmethod
def INPUT_TYPES(cls):
return {"required": {
"mode": ("BOOLEAN", {"default": True, "label_on": "max", "label_off": "min"}),
"a": (any_typ,),
"b": (any_typ,),
},
}
FUNCTION = "doit"
CATEGORY = "ImpactPack/Logic/_for_test"
RETURN_TYPES = ("INT", )
def doit(self, mode, a, b):
if mode:
return (max(a, b), )
else:
return (min(a, b),)
class ImpactQueueTrigger:
@classmethod
def INPUT_TYPES(cls):
return {"required": {
"signal": (any_typ,),
"mode": ("BOOLEAN", {"default": True, "label_on": "Trigger", "label_off": "Don't trigger"}),
}
}
FUNCTION = "doit"
CATEGORY = "ImpactPack/Logic/_for_test"
RETURN_TYPES = (any_typ,)
RETURN_NAMES = ("signal_opt",)
OUTPUT_NODE = True
def doit(self, signal, mode):
if(mode):
PromptServer.instance.send_sync("impact-add-queue", {})
return (signal,)
class ImpactQueueTriggerCountdown:
@classmethod
def INPUT_TYPES(cls):
return {"required": {
"count": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}),
"total": ("INT", {"default": 10, "min": 1, "max": 0xffffffffffffffff}),
"mode": ("BOOLEAN", {"default": True, "label_on": "Trigger", "label_off": "Don't trigger"}),
},
"optional": {"signal": (any_typ,),},
"hidden": {"unique_id": "UNIQUE_ID"}
}
FUNCTION = "doit"
CATEGORY = "ImpactPack/Logic/_for_test"
RETURN_TYPES = (any_typ, "INT", "INT")
RETURN_NAMES = ("signal_opt", "count", "total")
OUTPUT_NODE = True
def doit(self, count, total, mode, unique_id, signal=None):
if (mode):
if count < total - 1:
PromptServer.instance.send_sync("impact-node-feedback",
{"node_id": unique_id, "widget_name": "count", "type": "int", "value": count+1})
PromptServer.instance.send_sync("impact-add-queue", {})
if count >= total - 1:
PromptServer.instance.send_sync("impact-node-feedback",
{"node_id": unique_id, "widget_name": "count", "type": "int", "value": 0})
return (signal, count, total)
class ImpactSetWidgetValue:
@classmethod
def INPUT_TYPES(cls):
return {"required": {
"signal": (any_typ,),
"node_id": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}),
"widget_name": ("STRING", {"multiline": False}),
},
"optional": {
"boolean_value": ("BOOLEAN", {"forceInput": True}),
"int_value": ("INT", {"forceInput": True}),
"float_value": ("FLOAT", {"forceInput": True}),
"string_value": ("STRING", {"forceInput": True}),
}
}
FUNCTION = "doit"
CATEGORY = "ImpactPack/Logic/_for_test"
RETURN_TYPES = (any_typ,)
RETURN_NAMES = ("signal_opt",)
OUTPUT_NODE = True
def doit(self, signal, node_id, widget_name, boolean_value=None, int_value=None, float_value=None, string_value=None, ):
kind = None
if boolean_value is not None:
value = boolean_value
kind = "BOOLEAN"
elif int_value is not None:
value = int_value
kind = "INT"
elif float_value is not None:
value = float_value
kind = "FLOAT"
elif string_value is not None:
value = string_value
kind = "STRING"
else:
value = None
if value is not None:
PromptServer.instance.send_sync("impact-node-feedback",
{"node_id": node_id, "widget_name": widget_name, "type": kind, "value": value})
return (signal,)
class ImpactNodeSetMuteState:
@classmethod
def INPUT_TYPES(cls):
return {"required": {
"signal": (any_typ,),
"node_id": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}),
"set_state": ("BOOLEAN", {"default": True, "label_on": "active", "label_off": "mute"}),
}
}
FUNCTION = "doit"
CATEGORY = "ImpactPack/Logic/_for_test"
RETURN_TYPES = (any_typ,)
RETURN_NAMES = ("signal_opt",)
OUTPUT_NODE = True
def doit(self, signal, node_id, set_state):
PromptServer.instance.send_sync("impact-node-mute-state", {"node_id": node_id, "is_active": set_state})
return (signal,)
class ImpactSleep:
@classmethod
def INPUT_TYPES(cls):
return {"required": {
"signal": (any_typ,),
"seconds": ("FLOAT", {"default": 0.5, "min": 0, "max": 3600}),
}
}
FUNCTION = "doit"
CATEGORY = "ImpactPack/Logic/_for_test"
RETURN_TYPES = (any_typ,)
RETURN_NAMES = ("signal_opt",)
OUTPUT_NODE = True
def doit(self, signal, seconds):
time.sleep(seconds)
return (signal,)
error_skip_flag = False
try:
import cm_global
def filter_message(str):
global error_skip_flag
if "IMPACT-PACK-SIGNAL: STOP CONTROL BRIDGE" in str:
return True
elif error_skip_flag and "ERROR:root:!!! Exception during processing !!!\n" == str:
error_skip_flag = False
return True
else:
return False
cm_global.try_call(api='cm.register_message_collapse', f=filter_message)
except Exception as e:
print(f"[WARN] ComfyUI-Impact-Pack: `ComfyUI` or `ComfyUI-Manager` is an outdated version.")
pass
def workflow_to_map(workflow):
nodes = {}
links = {}
for link in workflow['links']:
links[link[0]] = link[1:]
for node in workflow['nodes']:
nodes[str(node['id'])] = node
return nodes, links
class ImpactRemoteBoolean:
@classmethod
def INPUT_TYPES(cls):
return {"required": {
"node_id": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}),
"widget_name": ("STRING", {"multiline": False}),
"value": ("BOOLEAN", {"default": True, "label_on": "True", "label_off": "False"}),
}}
FUNCTION = "doit"
CATEGORY = "ImpactPack/Logic/_for_test"
RETURN_TYPES = ()
OUTPUT_NODE = True
def doit(self, **kwargs):
return {}
class ImpactRemoteInt:
@classmethod
def INPUT_TYPES(cls):
return {"required": {
"node_id": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}),
"widget_name": ("STRING", {"multiline": False}),
"value": ("INT", {"default": 0, "min": -0xffffffffffffffff, "max": 0xffffffffffffffff}),
}}
FUNCTION = "doit"
CATEGORY = "ImpactPack/Logic/_for_test"
RETURN_TYPES = ()
OUTPUT_NODE = True
def doit(self, **kwargs):
return {}
class ImpactControlBridge:
@classmethod
def INPUT_TYPES(cls):
return {"required": {
"value": (any_typ,),
"mode": ("BOOLEAN", {"default": True, "label_on": "Active", "label_off": "Stop/Mute/Bypass"}),
"behavior": (["Stop", "Mute", "Bypass"], ),
},
"hidden": {"unique_id": "UNIQUE_ID", "prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"}
}
FUNCTION = "doit"
CATEGORY = "ImpactPack/Logic"
RETURN_TYPES = (any_typ,)
RETURN_NAMES = ("value",)
OUTPUT_NODE = True
DESCRIPTION = ("When behavior is Stop and mode is active, the input value is passed directly to the output.\n"
"When behavior is Mute/Bypass and mode is active, the node connected to the output is changed to active state.\n"
"When behavior is Stop and mode is Stop/Mute/Bypass, the workflow execution of the current node is halted.\n"
"When behavior is Mute/Bypass and mode is Stop/Mute/Bypass, the node connected to the output is changed to Mute/Bypass state.")
@classmethod
def IS_CHANGED(self, value, mode, behavior="Stop", unique_id=None, prompt=None, extra_pnginfo=None):
if behavior == "Stop":
return value, mode, behavior
else:
# NOTE: extra_pnginfo is not populated for IS_CHANGED.
# so extra_pnginfo is useless in here
try:
workflow = core.current_prompt['extra_data']['extra_pnginfo']['workflow']
except:
print(f"[Impact Pack] core.current_prompt['extra_data']['extra_pnginfo']['workflow']")
return 0
nodes, links = workflow_to_map(workflow)
next_nodes = []
for link in nodes[unique_id]['outputs'][0]['links']:
node_id = str(links[link][2])
impact.utils.collect_non_reroute_nodes(nodes, links, next_nodes, node_id)
return next_nodes
def doit(self, value, mode, behavior="Stop", unique_id=None, prompt=None, extra_pnginfo=None):
global error_skip_flag
if core.is_execution_model_version_supported():
from comfy_execution.graph import ExecutionBlocker
else:
print("[Impact Pack] ImpactControlBridge: ComfyUI is outdated. The 'Stop' behavior cannot function properly.")
if behavior == "Stop":
if mode:
return (value, )
else:
return (ExecutionBlocker(None), )
else:
workflow_nodes, links = workflow_to_map(extra_pnginfo['workflow'])
active_nodes = []
mute_nodes = []
bypass_nodes = []
for link in workflow_nodes[unique_id]['outputs'][0]['links']:
node_id = str(links[link][2])
next_nodes = []
impact.utils.collect_non_reroute_nodes(workflow_nodes, links, next_nodes, node_id)
for next_node_id in next_nodes:
node_mode = workflow_nodes[next_node_id]['mode']
if node_mode == 0:
active_nodes.append(next_node_id)
elif node_mode == 2:
mute_nodes.append(next_node_id)
elif node_mode == 4:
bypass_nodes.append(next_node_id)
if mode:
# active
should_be_active_nodes = mute_nodes + bypass_nodes
if len(should_be_active_nodes) > 0:
PromptServer.instance.send_sync("impact-bridge-continue", {"node_id": unique_id, 'actives': list(should_be_active_nodes)})
nodes.interrupt_processing()
elif behavior == "Mute" or behavior == True:
# mute
should_be_mute_nodes = active_nodes + bypass_nodes
if len(should_be_mute_nodes) > 0:
PromptServer.instance.send_sync("impact-bridge-continue", {"node_id": unique_id, 'mutes': list(should_be_mute_nodes)})
nodes.interrupt_processing()
else:
# bypass
should_be_bypass_nodes = active_nodes + mute_nodes
if len(should_be_bypass_nodes) > 0:
PromptServer.instance.send_sync("impact-bridge-continue", {"node_id": unique_id, 'bypasses': list(should_be_bypass_nodes)})
nodes.interrupt_processing()
return (value, )
class ImpactExecutionOrderController:
@classmethod
def INPUT_TYPES(cls):
return {"required": {
"signal": (any_typ,),
"value": (any_typ,),
}}
FUNCTION = "doit"
CATEGORY = "ImpactPack/Util"
RETURN_TYPES = (any_typ, any_typ)
RETURN_NAMES = ("signal", "value")
def doit(self, signal, value):
return signal, value
original_handle_execution = execution.PromptExecutor.handle_execution_error
def handle_execution_error(**kwargs):
execution.PromptExecutor.handle_execution_error(**kwargs)