Spaces:
Running
on
Zero
Running
on
Zero
File size: 7,892 Bytes
ec2cf52 6eed647 09954f9 ec2cf52 3f5fcc5 ec2cf52 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
import gradio as gr
import spaces
import torch
from gradio_rerun import Rerun
import rerun as rr
import rerun.blueprint as rrb
from pathlib import Path
import uuid
from mini_dust3r.api import OptimizedResult, inferece_dust3r, log_optimized_result
from mini_dust3r.model import AsymmetricCroCo3DStereo
from mini_dust3r.utils.misc import (
fill_default_args,
freeze_all_params,
is_symmetrized,
interleave,
transpose_to_landscape,
)
import os
from mini_dust3r.model import load_model
from catmlp_dpt_head import Cat_MLP_LocalFeatures_DPT_Pts3d, postprocess
DEVICE = "cuda" if torch.cuda.is_available() else "CPU"
# model = AsymmetricCroCo3DStereo.from_pretrained(
# "naver/DUSt3R_ViTLarge_BaseDecoder_512_dpt"
# ).to(DEVICE)
from mini_dust3r.heads.linear_head import LinearPts3d
from mini_dust3r.heads.dpt_head import create_dpt_head
def head_factory(head_type, output_mode, net, has_conf=False):
"""" build a prediction head for the decoder
"""
if head_type == 'linear' and output_mode == 'pts3d':
return LinearPts3d(net, has_conf)
elif head_type == 'dpt' and output_mode == 'pts3d':
return create_dpt_head(net, has_conf=has_conf)
if head_type == 'catmlp+dpt' and output_mode.startswith('pts3d+desc'):
local_feat_dim = int(output_mode[10:])
assert net.dec_depth > 9
l2 = net.dec_depth
feature_dim = 256
last_dim = feature_dim // 2
out_nchan = 3
ed = net.enc_embed_dim
dd = net.dec_embed_dim
return Cat_MLP_LocalFeatures_DPT_Pts3d(net, local_feat_dim=local_feat_dim, has_conf=has_conf,
num_channels=out_nchan + has_conf,
feature_dim=feature_dim,
last_dim=last_dim,
hooks_idx=[0, l2 * 2 // 4, l2 * 3 // 4, l2],
dim_tokens=[ed, dd, dd, dd],
postprocess=postprocess,
depth_mode=net.depth_mode,
conf_mode=net.conf_mode,
head_type='regression')
else:
raise NotImplementedError(f"unexpected {head_type=} and {output_mode=}")
class AsymmetricMASt3R(AsymmetricCroCo3DStereo):
def __init__(self, desc_mode=('norm'), two_confs=False, desc_conf_mode=None, **kwargs):
self.desc_mode = desc_mode
self.two_confs = two_confs
self.desc_conf_mode = desc_conf_mode
super().__init__(**kwargs)
@classmethod
def from_pretrained(cls, pretrained_model_name_or_path, **kw):
if os.path.isfile(pretrained_model_name_or_path):
return load_model(pretrained_model_name_or_path, device='cpu')
else:
return super(AsymmetricMASt3R, cls).from_pretrained(pretrained_model_name_or_path, **kw)
def set_downstream_head(self, output_mode, head_type, landscape_only, depth_mode, conf_mode, patch_size, img_size, **kw):
assert img_size[0] % patch_size == 0 and img_size[
1] % patch_size == 0, f'{img_size=} must be multiple of {patch_size=}'
self.output_mode = output_mode
self.head_type = head_type
self.depth_mode = depth_mode
self.conf_mode = conf_mode
if self.desc_conf_mode is None:
self.desc_conf_mode = conf_mode
# allocate heads
self.downstream_head1 = head_factory(head_type, output_mode, self, has_conf=bool(conf_mode))
self.downstream_head2 = head_factory(head_type, output_mode, self, has_conf=bool(conf_mode))
# magic wrapper
self.head1 = transpose_to_landscape(self.downstream_head1, activate=landscape_only)
self.head2 = transpose_to_landscape(self.downstream_head2, activate=landscape_only)
model = AsymmetricMASt3R.from_pretrained(
"naver/MASt3R_ViTLarge_BaseDecoder_512_catmlpdpt_metric").to(DEVICE)
def create_blueprint(image_name_list: list[str], log_path: Path) -> rrb.Blueprint:
# dont show 2d views if there are more than 4 images as to not clutter the view
if len(image_name_list) > 4:
blueprint = rrb.Blueprint(
rrb.Horizontal(
rrb.Spatial3DView(origin=f"{log_path}"),
),
collapse_panels=True,
)
else:
blueprint = rrb.Blueprint(
rrb.Horizontal(
contents=[
rrb.Spatial3DView(origin=f"{log_path}"),
rrb.Vertical(
contents=[
rrb.Spatial2DView(
origin=f"{log_path}/camera_{i}/pinhole/",
contents=[
"+ $origin/**",
],
)
for i in range(len(image_name_list))
]
),
],
column_shares=[3, 1],
),
collapse_panels=True,
)
return blueprint
@spaces.GPU
def predict(image_name_list: list[str] | str):
# check if is list or string and if not raise error
if not isinstance(image_name_list, list) and not isinstance(image_name_list, str):
raise gr.Error(
f"Input must be a list of strings or a string, got: {type(image_name_list)}"
)
uuid_str = str(uuid.uuid4())
filename = Path(f"/tmp/gradio/{uuid_str}.rrd")
rr.init(f"{uuid_str}")
log_path = Path("world")
if isinstance(image_name_list, str):
image_name_list = [image_name_list]
optimized_results: OptimizedResult = inferece_dust3r(
image_dir_or_list=image_name_list,
model=model,
device=DEVICE,
batch_size=1,
)
blueprint: rrb.Blueprint = create_blueprint(image_name_list, log_path)
rr.send_blueprint(blueprint)
rr.set_time_sequence("sequence", 0)
log_optimized_result(optimized_results, log_path)
rr.save(filename.as_posix())
return filename.as_posix()
with gr.Blocks(
css=""".gradio-container {margin: 0 !important; min-width: 100%};""",
title="Mini-DUSt3R Demo",
) as demo:
# scene state is save so that you can change conf_thr, cam_size... without rerunning the inference
gr.HTML('<h2 style="text-align: center;">Mini-DUSt3R Demo</h2>')
gr.HTML(
'<p style="text-align: center;">Unofficial DUSt3R demo using the mini-dust3r pip package</p>'
)
gr.HTML(
'<p style="text-align: center;">More info <a href="https://github.com/pablovela5620/mini-dust3r">here</a></p>'
)
with gr.Tab(label="Single Image"):
with gr.Column():
single_image = gr.Image(type="filepath", height=300)
run_btn_single = gr.Button("Run")
rerun_viewer_single = Rerun(height=900)
run_btn_single.click(
fn=predict, inputs=[single_image], outputs=[rerun_viewer_single]
)
example_single_dir = Path("examples/single_image")
example_single_files = sorted(example_single_dir.glob("*.png"))
examples_single = gr.Examples(
examples=example_single_files,
inputs=[single_image],
outputs=[rerun_viewer_single],
fn=predict,
cache_examples="lazy",
)
with gr.Tab(label="Multi Image"):
with gr.Column():
multi_files = gr.File(file_count="multiple")
run_btn_multi = gr.Button("Run")
rerun_viewer_multi = Rerun(height=900)
run_btn_multi.click(
fn=predict, inputs=[multi_files], outputs=[rerun_viewer_multi]
)
demo.launch() |