Spaces:
Running
on
Zero
Running
on
Zero
import json | |
import numpy as np | |
import rerun as rr | |
import spaces | |
import gradio as gr | |
from gradio_rerun import Rerun | |
from scipy.spatial.transform import Rotation | |
import tempfile | |
import os | |
from typing import Optional, Dict, Any, List, Tuple | |
def vector3_to_numpy(vec): | |
"""Convert Vector3 dictionary to numpy array""" | |
return np.array([vec['x'], vec['y'], vec['z']]) | |
def euler_to_quaternion(euler): | |
"""Convert Euler angles dictionary to quaternion""" | |
return Rotation.from_euler('xyz', [euler['x'], euler['y'], euler['z']]).as_quat() | |
def create_subject_mesh(subject): | |
"""Create a simple cube mesh for a subject""" | |
position = vector3_to_numpy(subject['position']) | |
size = vector3_to_numpy(subject['size']) | |
# Create cube vertices | |
vertices = np.array([ | |
[-0.5, -0.5, -0.5], [0.5, -0.5, -0.5], [0.5, 0.5, -0.5], [-0.5, 0.5, -0.5], | |
[-0.5, -0.5, 0.5], [0.5, -0.5, 0.5], [0.5, 0.5, 0.5], [-0.5, 0.5, 0.5] | |
]) * size.reshape(1, 3) + position.reshape(1, 3) | |
# Create cube faces | |
faces = np.array([ | |
[0, 1, 2], [0, 2, 3], # front | |
[1, 5, 6], [1, 6, 2], # right | |
[5, 4, 7], [5, 7, 6], # back | |
[4, 0, 3], [4, 3, 7], # left | |
[3, 2, 6], [3, 6, 7], # top | |
[4, 5, 1], [4, 1, 0] # bottom | |
]) | |
return vertices, faces | |
def log_simulation(simulation_data: Dict[str, Any]) -> None: | |
"""Log single simulation data to Rerun""" | |
rr.init("camera_simulation") | |
subjects = simulation_data['subjects'] | |
camera_frames = simulation_data['cameraFrames'] | |
instructions = simulation_data['instructions'] | |
# Log simulation metadata | |
rr.log("metadata/instructions", rr.TextDocument( | |
"\n".join([ | |
f"Instruction {i+1}:\n" + | |
f" Movement: {inst['cameraMovement']}\n" + | |
f" Easing: {inst['movementEasing']}\n" + | |
f" Frames: {inst['frameCount']}\n" + | |
f" Camera Angle: {inst.get('initialCameraAngle', 'N/A')}\n" + | |
f" Shot Type: {inst.get('initialShotType', 'N/A')}\n" + | |
f" Subject Index: {inst.get('subjectIndex', 'N/A')}" | |
for i, inst in enumerate(instructions) | |
]) | |
), timeless=True) | |
# Set up world coordinate system | |
rr.log("world", rr.ViewCoordinates.RIGHT_HAND_Y_UP, timeless=True) | |
# Log subjects (as simple cubes) | |
for idx, subject in enumerate(subjects): | |
vertices, faces = create_subject_mesh(subject) | |
subject_color = [0.8, 0.2, 0.2, 1.0] if idx == simulation_data.get( | |
'selectedSubject') else [0.8, 0.8, 0.8, 1.0] | |
rr.log( | |
f"world/subject_{idx}", | |
rr.Mesh3D( | |
vertex_positions=vertices, | |
indices=faces, | |
# Apply color to all vertices | |
colors=np.tile(subject_color, (len(vertices), 1)) | |
), | |
timeless=True | |
) | |
# Log subject class | |
rr.log(f"world/subject_{idx}/class", | |
rr.TextDocument(subject['objectClass']), | |
timeless=True) | |
# Log camera trajectory | |
camera_positions = np.array( | |
[vector3_to_numpy(frame['position']) for frame in camera_frames]) | |
rr.log( | |
"world/camera_trajectory", | |
rr.Points3D( | |
camera_positions, | |
# Cyan color for trajectory | |
colors=np.full((len(camera_positions), 4), [0.0, 0.8, 0.8, 1.0]) | |
), | |
timeless=True | |
) | |
# Log camera movement over time | |
for frame_idx, camera_frame in enumerate(camera_frames): | |
rr.set_time_sequence("frame", frame_idx) | |
position = vector3_to_numpy(camera_frame['position']) | |
rotation_q = euler_to_quaternion(camera_frame['angle']) | |
# Log camera transform | |
rr.log( | |
"world/camera", | |
rr.Transform3D( | |
translation=position, | |
rotation=rr.Quaternion(xyzw=rotation_q) | |
) | |
) | |
# Log camera frustum | |
rr.log( | |
"world/camera/view", | |
rr.Pinhole( | |
focal_length=camera_frame['focalLength'], | |
width=1920, | |
height=1080 | |
) | |
) | |
# Log frame number | |
rr.log( | |
"metadata/current_frame", | |
rr.TextDocument(f"Frame: {frame_idx + 1}/{len(camera_frames)}"), | |
) | |
def load_simulation_data(file) -> Tuple[Optional[List[Dict[str, Any]]], Optional[List[str]]]: | |
"""Load simulation data from JSON file and return simulations with their descriptions""" | |
if file is None: | |
return None, None | |
try: | |
json_data = json.load(open(file.name)) | |
simulations = json_data['simulations'] | |
# Create descriptions for each simulation | |
descriptions = [ | |
f"Simulation {i}: {len(sim['subjects'])} subjects, {len(sim['instructions'])} instructions" | |
for i, sim in enumerate(simulations) | |
] | |
return simulations, descriptions | |
except Exception as e: | |
print(f"Error loading simulation data: {str(e)}") | |
return None, None | |
def visualize_simulation(file, simulation_index: int) -> Optional[str]: | |
"""Process selected simulation and create Rerun visualization""" | |
if file is None: | |
return None | |
try: | |
simulations, _ = load_simulation_data(file) | |
if simulations is None or simulation_index >= len(simulations): | |
return None | |
# Create temporary file for RRD | |
temp_dir = tempfile.mkdtemp() | |
rrd_path = os.path.join(temp_dir, "simulation.rrd") | |
# Log selected simulation | |
simulation = simulations[simulation_index] | |
log_simulation(simulation) | |
rr.save(rrd_path) | |
return rrd_path | |
except Exception as e: | |
print(f"Error processing simulation: {str(e)}") | |
return None | |
def update_simulation_dropdown(file): | |
"""Update simulation dropdown when file is uploaded""" | |
_, descriptions = load_simulation_data(file) | |
return gr.Dropdown(choices=descriptions if descriptions else [], value=None) | |
# Create Gradio interface | |
with gr.Blocks() as demo: | |
gr.Markdown(""" | |
# Camera Simulation Visualizer | |
Upload a JSON file containing camera simulation data and select a simulation to visualize. | |
""") | |
with gr.Row(): | |
file_input = gr.File( | |
label="Upload Simulation JSON", | |
file_types=[".json"] | |
) | |
simulation_dropdown = gr.Dropdown( | |
label="Select Simulation", | |
choices=[], | |
type="index" | |
) | |
with gr.Row(): | |
viewer = Rerun(streaming=False) | |
# Update dropdown when file is uploaded | |
file_input.change( | |
update_simulation_dropdown, | |
inputs=[file_input], | |
outputs=[simulation_dropdown] | |
) | |
# Visualize selected simulation | |
simulation_dropdown.change( | |
visualize_simulation, | |
inputs=[file_input, simulation_dropdown], | |
outputs=[viewer] | |
) | |
if __name__ == "__main__": | |
demo.queue().launch(share=False) | |