Spaces:
Running
Running
from ..utils import common_annotator_call | |
import comfy.model_management as model_management | |
import torch | |
import numpy as np | |
from einops import rearrange | |
import torch.nn.functional as F | |
class Unimatch_OptFlowPreprocessor: | |
def INPUT_TYPES(s): | |
return { | |
"required": dict( | |
image=("IMAGE",), | |
ckpt_name=( | |
["gmflow-scale1-mixdata.pth", "gmflow-scale2-mixdata.pth", "gmflow-scale2-regrefine6-mixdata.pth"], | |
{"default": "gmflow-scale2-regrefine6-mixdata.pth"} | |
), | |
backward_flow=("BOOLEAN", {"default": False}), | |
bidirectional_flow=("BOOLEAN", {"default": False}) | |
) | |
} | |
RETURN_TYPES = ("OPTICAL_FLOW", "IMAGE") | |
RETURN_NAMES = ("OPTICAL_FLOW", "PREVIEW_IMAGE") | |
FUNCTION = "estimate" | |
CATEGORY = "ControlNet Preprocessors/Optical Flow" | |
def estimate(self, image, ckpt_name, backward_flow=False, bidirectional_flow=False): | |
assert len(image) > 1, "[Unimatch] Requiring as least two frames as an optical flow estimator. Only use this node on video input." | |
from custom_controlnet_aux.unimatch import UnimatchDetector | |
tensor_images = image | |
model = UnimatchDetector.from_pretrained(filename=ckpt_name).to(model_management.get_torch_device()) | |
flows, vis_flows = [], [] | |
for i in range(len(tensor_images) - 1): | |
image0, image1 = np.asarray(image[i:i+2].cpu() * 255., dtype=np.uint8) | |
flow, vis_flow = model(image0, image1, output_type="np", pred_bwd_flow=backward_flow, pred_bidir_flow=bidirectional_flow) | |
flows.append(torch.from_numpy(flow).float()) | |
vis_flows.append(torch.from_numpy(vis_flow).float() / 255.) | |
del model | |
return (torch.stack(flows, dim=0), torch.stack(vis_flows, dim=0)) | |
class MaskOptFlow: | |
def INPUT_TYPES(s): | |
return { | |
"required": dict(optical_flow=("OPTICAL_FLOW",), mask=("MASK",)) | |
} | |
RETURN_TYPES = ("OPTICAL_FLOW", "IMAGE") | |
RETURN_NAMES = ("OPTICAL_FLOW", "PREVIEW_IMAGE") | |
FUNCTION = "mask_opt_flow" | |
CATEGORY = "ControlNet Preprocessors/Optical Flow" | |
def mask_opt_flow(self, optical_flow, mask): | |
from custom_controlnet_aux.unimatch import flow_to_image | |
assert len(mask) >= len(optical_flow), f"Not enough masks to mask optical flow: {len(mask)} vs {len(optical_flow)}" | |
mask = mask[:optical_flow.shape[0]] | |
mask = F.interpolate(mask, optical_flow.shape[1:3]) | |
mask = rearrange(mask, "n 1 h w -> n h w 1") | |
vis_flows = torch.stack([torch.from_numpy(flow_to_image(flow)).float() / 255. for flow in optical_flow.numpy()], dim=0) | |
vis_flows *= mask | |
optical_flow *= mask | |
return (optical_flow, vis_flows) | |
NODE_CLASS_MAPPINGS = { | |
"Unimatch_OptFlowPreprocessor": Unimatch_OptFlowPreprocessor, | |
"MaskOptFlow": MaskOptFlow | |
} | |
NODE_DISPLAY_NAME_MAPPINGS = { | |
"Unimatch_OptFlowPreprocessor": "Unimatch Optical Flow", | |
"MaskOptFlow": "Mask Optical Flow (DragNUWA)" | |
} |