Spaces:
Sleeping
Sleeping
from comfy_execution.graph_utils import GraphBuilder | |
from .tools import VariantSupport | |
class TestAccumulateNode: | |
def __init__(self): | |
pass | |
def INPUT_TYPES(cls): | |
return { | |
"required": { | |
"to_add": ("*",), | |
}, | |
"optional": { | |
"accumulation": ("ACCUMULATION",), | |
}, | |
} | |
RETURN_TYPES = ("ACCUMULATION",) | |
FUNCTION = "accumulate" | |
CATEGORY = "Testing/Lists" | |
def accumulate(self, to_add, accumulation = None): | |
if accumulation is None: | |
value = [to_add] | |
else: | |
value = accumulation["accum"] + [to_add] | |
return ({"accum": value},) | |
class TestAccumulationHeadNode: | |
def __init__(self): | |
pass | |
def INPUT_TYPES(cls): | |
return { | |
"required": { | |
"accumulation": ("ACCUMULATION",), | |
}, | |
} | |
RETURN_TYPES = ("ACCUMULATION", "*",) | |
FUNCTION = "accumulation_head" | |
CATEGORY = "Testing/Lists" | |
def accumulation_head(self, accumulation): | |
accum = accumulation["accum"] | |
if len(accum) == 0: | |
return (accumulation, None) | |
else: | |
return ({"accum": accum[1:]}, accum[0]) | |
class TestAccumulationTailNode: | |
def __init__(self): | |
pass | |
def INPUT_TYPES(cls): | |
return { | |
"required": { | |
"accumulation": ("ACCUMULATION",), | |
}, | |
} | |
RETURN_TYPES = ("ACCUMULATION", "*",) | |
FUNCTION = "accumulation_tail" | |
CATEGORY = "Testing/Lists" | |
def accumulation_tail(self, accumulation): | |
accum = accumulation["accum"] | |
if len(accum) == 0: | |
return (None, accumulation) | |
else: | |
return ({"accum": accum[:-1]}, accum[-1]) | |
class TestAccumulationToListNode: | |
def __init__(self): | |
pass | |
def INPUT_TYPES(cls): | |
return { | |
"required": { | |
"accumulation": ("ACCUMULATION",), | |
}, | |
} | |
RETURN_TYPES = ("*",) | |
OUTPUT_IS_LIST = (True,) | |
FUNCTION = "accumulation_to_list" | |
CATEGORY = "Testing/Lists" | |
def accumulation_to_list(self, accumulation): | |
return (accumulation["accum"],) | |
class TestListToAccumulationNode: | |
def __init__(self): | |
pass | |
def INPUT_TYPES(cls): | |
return { | |
"required": { | |
"list": ("*",), | |
}, | |
} | |
RETURN_TYPES = ("ACCUMULATION",) | |
INPUT_IS_LIST = (True,) | |
FUNCTION = "list_to_accumulation" | |
CATEGORY = "Testing/Lists" | |
def list_to_accumulation(self, list): | |
return ({"accum": list},) | |
class TestAccumulationGetLengthNode: | |
def __init__(self): | |
pass | |
def INPUT_TYPES(cls): | |
return { | |
"required": { | |
"accumulation": ("ACCUMULATION",), | |
}, | |
} | |
RETURN_TYPES = ("INT",) | |
FUNCTION = "accumlength" | |
CATEGORY = "Testing/Lists" | |
def accumlength(self, accumulation): | |
return (len(accumulation['accum']),) | |
class TestAccumulationGetItemNode: | |
def __init__(self): | |
pass | |
def INPUT_TYPES(cls): | |
return { | |
"required": { | |
"accumulation": ("ACCUMULATION",), | |
"index": ("INT", {"default":0, "step":1}) | |
}, | |
} | |
RETURN_TYPES = ("*",) | |
FUNCTION = "get_item" | |
CATEGORY = "Testing/Lists" | |
def get_item(self, accumulation, index): | |
return (accumulation['accum'][index],) | |
class TestAccumulationSetItemNode: | |
def __init__(self): | |
pass | |
def INPUT_TYPES(cls): | |
return { | |
"required": { | |
"accumulation": ("ACCUMULATION",), | |
"index": ("INT", {"default":0, "step":1}), | |
"value": ("*",), | |
}, | |
} | |
RETURN_TYPES = ("ACCUMULATION",) | |
FUNCTION = "set_item" | |
CATEGORY = "Testing/Lists" | |
def set_item(self, accumulation, index, value): | |
new_accum = accumulation['accum'][:] | |
new_accum[index] = value | |
return ({"accum": new_accum},) | |
class TestIntMathOperation: | |
def __init__(self): | |
pass | |
def INPUT_TYPES(cls): | |
return { | |
"required": { | |
"a": ("INT", {"default": 0, "min": -0xffffffffffffffff, "max": 0xffffffffffffffff, "step": 1}), | |
"b": ("INT", {"default": 0, "min": -0xffffffffffffffff, "max": 0xffffffffffffffff, "step": 1}), | |
"operation": (["add", "subtract", "multiply", "divide", "modulo", "power"],), | |
}, | |
} | |
RETURN_TYPES = ("INT",) | |
FUNCTION = "int_math_operation" | |
CATEGORY = "Testing/Logic" | |
def int_math_operation(self, a, b, operation): | |
if operation == "add": | |
return (a + b,) | |
elif operation == "subtract": | |
return (a - b,) | |
elif operation == "multiply": | |
return (a * b,) | |
elif operation == "divide": | |
return (a // b,) | |
elif operation == "modulo": | |
return (a % b,) | |
elif operation == "power": | |
return (a ** b,) | |
from .flow_control import NUM_FLOW_SOCKETS | |
class TestForLoopOpen: | |
def __init__(self): | |
pass | |
def INPUT_TYPES(cls): | |
return { | |
"required": { | |
"remaining": ("INT", {"default": 1, "min": 0, "max": 100000, "step": 1}), | |
}, | |
"optional": { | |
f"initial_value{i}": ("*",) for i in range(1, NUM_FLOW_SOCKETS) | |
}, | |
"hidden": { | |
"initial_value0": ("*",) | |
} | |
} | |
RETURN_TYPES = tuple(["FLOW_CONTROL", "INT",] + ["*"] * (NUM_FLOW_SOCKETS-1)) | |
RETURN_NAMES = tuple(["flow_control", "remaining"] + [f"value{i}" for i in range(1, NUM_FLOW_SOCKETS)]) | |
FUNCTION = "for_loop_open" | |
CATEGORY = "Testing/Flow" | |
def for_loop_open(self, remaining, **kwargs): | |
graph = GraphBuilder() | |
if "initial_value0" in kwargs: | |
remaining = kwargs["initial_value0"] | |
while_open = graph.node("TestWhileLoopOpen", condition=remaining, initial_value0=remaining, **{(f"initial_value{i}"): kwargs.get(f"initial_value{i}", None) for i in range(1, NUM_FLOW_SOCKETS)}) | |
outputs = [kwargs.get(f"initial_value{i}", None) for i in range(1, NUM_FLOW_SOCKETS)] | |
return { | |
"result": tuple(["stub", remaining] + outputs), | |
"expand": graph.finalize(), | |
} | |
class TestForLoopClose: | |
def __init__(self): | |
pass | |
def INPUT_TYPES(cls): | |
return { | |
"required": { | |
"flow_control": ("FLOW_CONTROL", {"rawLink": True}), | |
}, | |
"optional": { | |
f"initial_value{i}": ("*",{"rawLink": True}) for i in range(1, NUM_FLOW_SOCKETS) | |
}, | |
} | |
RETURN_TYPES = tuple(["*"] * (NUM_FLOW_SOCKETS-1)) | |
RETURN_NAMES = tuple([f"value{i}" for i in range(1, NUM_FLOW_SOCKETS)]) | |
FUNCTION = "for_loop_close" | |
CATEGORY = "Testing/Flow" | |
def for_loop_close(self, flow_control, **kwargs): | |
graph = GraphBuilder() | |
while_open = flow_control[0] | |
sub = graph.node("TestIntMathOperation", operation="subtract", a=[while_open,1], b=1) | |
cond = graph.node("TestToBoolNode", value=sub.out(0)) | |
input_values = {f"initial_value{i}": kwargs.get(f"initial_value{i}", None) for i in range(1, NUM_FLOW_SOCKETS)} | |
while_close = graph.node("TestWhileLoopClose", | |
flow_control=flow_control, | |
condition=cond.out(0), | |
initial_value0=sub.out(0), | |
**input_values) | |
return { | |
"result": tuple([while_close.out(i) for i in range(1, NUM_FLOW_SOCKETS)]), | |
"expand": graph.finalize(), | |
} | |
NUM_LIST_SOCKETS = 10 | |
class TestMakeListNode: | |
def __init__(self): | |
pass | |
def INPUT_TYPES(cls): | |
return { | |
"required": { | |
"value1": ("*",), | |
}, | |
"optional": { | |
f"value{i}": ("*",) for i in range(1, NUM_LIST_SOCKETS) | |
}, | |
} | |
RETURN_TYPES = ("*",) | |
FUNCTION = "make_list" | |
OUTPUT_IS_LIST = (True,) | |
CATEGORY = "Testing/Lists" | |
def make_list(self, **kwargs): | |
result = [] | |
for i in range(NUM_LIST_SOCKETS): | |
if f"value{i}" in kwargs: | |
result.append(kwargs[f"value{i}"]) | |
return (result,) | |
UTILITY_NODE_CLASS_MAPPINGS = { | |
"TestAccumulateNode": TestAccumulateNode, | |
"TestAccumulationHeadNode": TestAccumulationHeadNode, | |
"TestAccumulationTailNode": TestAccumulationTailNode, | |
"TestAccumulationToListNode": TestAccumulationToListNode, | |
"TestListToAccumulationNode": TestListToAccumulationNode, | |
"TestAccumulationGetLengthNode": TestAccumulationGetLengthNode, | |
"TestAccumulationGetItemNode": TestAccumulationGetItemNode, | |
"TestAccumulationSetItemNode": TestAccumulationSetItemNode, | |
"TestForLoopOpen": TestForLoopOpen, | |
"TestForLoopClose": TestForLoopClose, | |
"TestIntMathOperation": TestIntMathOperation, | |
"TestMakeListNode": TestMakeListNode, | |
} | |
UTILITY_NODE_DISPLAY_NAME_MAPPINGS = { | |
"TestAccumulateNode": "Accumulate", | |
"TestAccumulationHeadNode": "Accumulation Head", | |
"TestAccumulationTailNode": "Accumulation Tail", | |
"TestAccumulationToListNode": "Accumulation to List", | |
"TestListToAccumulationNode": "List to Accumulation", | |
"TestAccumulationGetLengthNode": "Accumulation Get Length", | |
"TestAccumulationGetItemNode": "Accumulation Get Item", | |
"TestAccumulationSetItemNode": "Accumulation Set Item", | |
"TestForLoopOpen": "For Loop Open", | |
"TestForLoopClose": "For Loop Close", | |
"TestIntMathOperation": "Int Math Operation", | |
"TestMakeListNode": "Make List", | |
} | |