Spaces:
Running
Running
File size: 3,207 Bytes
681fa96 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
from ..utils import common_annotator_call
import comfy.model_management as model_management
import torch
import numpy as np
from einops import rearrange
import torch.nn.functional as F
class Unimatch_OptFlowPreprocessor:
@classmethod
def INPUT_TYPES(s):
return {
"required": dict(
image=("IMAGE",),
ckpt_name=(
["gmflow-scale1-mixdata.pth", "gmflow-scale2-mixdata.pth", "gmflow-scale2-regrefine6-mixdata.pth"],
{"default": "gmflow-scale2-regrefine6-mixdata.pth"}
),
backward_flow=("BOOLEAN", {"default": False}),
bidirectional_flow=("BOOLEAN", {"default": False})
)
}
RETURN_TYPES = ("OPTICAL_FLOW", "IMAGE")
RETURN_NAMES = ("OPTICAL_FLOW", "PREVIEW_IMAGE")
FUNCTION = "estimate"
CATEGORY = "ControlNet Preprocessors/Optical Flow"
def estimate(self, image, ckpt_name, backward_flow=False, bidirectional_flow=False):
assert len(image) > 1, "[Unimatch] Requiring as least two frames as an optical flow estimator. Only use this node on video input."
from custom_controlnet_aux.unimatch import UnimatchDetector
tensor_images = image
model = UnimatchDetector.from_pretrained(filename=ckpt_name).to(model_management.get_torch_device())
flows, vis_flows = [], []
for i in range(len(tensor_images) - 1):
image0, image1 = np.asarray(image[i:i+2].cpu() * 255., dtype=np.uint8)
flow, vis_flow = model(image0, image1, output_type="np", pred_bwd_flow=backward_flow, pred_bidir_flow=bidirectional_flow)
flows.append(torch.from_numpy(flow).float())
vis_flows.append(torch.from_numpy(vis_flow).float() / 255.)
del model
return (torch.stack(flows, dim=0), torch.stack(vis_flows, dim=0))
class MaskOptFlow:
@classmethod
def INPUT_TYPES(s):
return {
"required": dict(optical_flow=("OPTICAL_FLOW",), mask=("MASK",))
}
RETURN_TYPES = ("OPTICAL_FLOW", "IMAGE")
RETURN_NAMES = ("OPTICAL_FLOW", "PREVIEW_IMAGE")
FUNCTION = "mask_opt_flow"
CATEGORY = "ControlNet Preprocessors/Optical Flow"
def mask_opt_flow(self, optical_flow, mask):
from custom_controlnet_aux.unimatch import flow_to_image
assert len(mask) >= len(optical_flow), f"Not enough masks to mask optical flow: {len(mask)} vs {len(optical_flow)}"
mask = mask[:optical_flow.shape[0]]
mask = F.interpolate(mask, optical_flow.shape[1:3])
mask = rearrange(mask, "n 1 h w -> n h w 1")
vis_flows = torch.stack([torch.from_numpy(flow_to_image(flow)).float() / 255. for flow in optical_flow.numpy()], dim=0)
vis_flows *= mask
optical_flow *= mask
return (optical_flow, vis_flows)
NODE_CLASS_MAPPINGS = {
"Unimatch_OptFlowPreprocessor": Unimatch_OptFlowPreprocessor,
"MaskOptFlow": MaskOptFlow
}
NODE_DISPLAY_NAME_MAPPINGS = {
"Unimatch_OptFlowPreprocessor": "Unimatch Optical Flow",
"MaskOptFlow": "Mask Optical Flow (DragNUWA)"
} |