rerun-viewer / app.py
abreza's picture
Enhance log_simulation function with color support for vertices and camera trajectory
a202f8d
raw
history blame
7.05 kB
import json
import numpy as np
import rerun as rr
import spaces
import gradio as gr
from gradio_rerun import Rerun
from scipy.spatial.transform import Rotation
import tempfile
import os
from typing import Optional, Dict, Any, List, Tuple
def vector3_to_numpy(vec):
"""Convert Vector3 dictionary to numpy array"""
return np.array([vec['x'], vec['y'], vec['z']])
def euler_to_quaternion(euler):
"""Convert Euler angles dictionary to quaternion"""
return Rotation.from_euler('xyz', [euler['x'], euler['y'], euler['z']]).as_quat()
def create_subject_mesh(subject):
"""Create a simple cube mesh for a subject"""
position = vector3_to_numpy(subject['position'])
size = vector3_to_numpy(subject['size'])
# Create cube vertices
vertices = np.array([
[-0.5, -0.5, -0.5], [0.5, -0.5, -0.5], [0.5, 0.5, -0.5], [-0.5, 0.5, -0.5],
[-0.5, -0.5, 0.5], [0.5, -0.5, 0.5], [0.5, 0.5, 0.5], [-0.5, 0.5, 0.5]
]) * size.reshape(1, 3) + position.reshape(1, 3)
# Create cube faces
faces = np.array([
[0, 1, 2], [0, 2, 3], # front
[1, 5, 6], [1, 6, 2], # right
[5, 4, 7], [5, 7, 6], # back
[4, 0, 3], [4, 3, 7], # left
[3, 2, 6], [3, 6, 7], # top
[4, 5, 1], [4, 1, 0] # bottom
])
return vertices, faces
def log_simulation(simulation_data: Dict[str, Any]) -> None:
"""Log single simulation data to Rerun"""
rr.init("camera_simulation")
subjects = simulation_data['subjects']
camera_frames = simulation_data['cameraFrames']
instructions = simulation_data['instructions']
# Log simulation metadata
rr.log("metadata/instructions", rr.TextDocument(
"\n".join([
f"Instruction {i+1}:\n" +
f" Movement: {inst['cameraMovement']}\n" +
f" Easing: {inst['movementEasing']}\n" +
f" Frames: {inst['frameCount']}\n" +
f" Camera Angle: {inst.get('initialCameraAngle', 'N/A')}\n" +
f" Shot Type: {inst.get('initialShotType', 'N/A')}\n" +
f" Subject Index: {inst.get('subjectIndex', 'N/A')}"
for i, inst in enumerate(instructions)
])
), timeless=True)
# Set up world coordinate system
rr.log("world", rr.ViewCoordinates.RIGHT_HAND_Y_UP, timeless=True)
# Log subjects (as simple cubes)
for idx, subject in enumerate(subjects):
vertices, faces = create_subject_mesh(subject)
subject_color = [0.8, 0.2, 0.2, 1.0] if idx == simulation_data.get(
'selectedSubject') else [0.8, 0.8, 0.8, 1.0]
rr.log(
f"world/subject_{idx}",
rr.Mesh3D(
vertex_positions=vertices,
indices=faces,
# Apply color to all vertices
colors=np.tile(subject_color, (len(vertices), 1))
),
timeless=True
)
# Log subject class
rr.log(f"world/subject_{idx}/class",
rr.TextDocument(subject['objectClass']),
timeless=True)
# Log camera trajectory
camera_positions = np.array(
[vector3_to_numpy(frame['position']) for frame in camera_frames])
rr.log(
"world/camera_trajectory",
rr.Points3D(
camera_positions,
# Cyan color for trajectory
colors=np.full((len(camera_positions), 4), [0.0, 0.8, 0.8, 1.0])
),
timeless=True
)
# Log camera movement over time
for frame_idx, camera_frame in enumerate(camera_frames):
rr.set_time_sequence("frame", frame_idx)
position = vector3_to_numpy(camera_frame['position'])
rotation_q = euler_to_quaternion(camera_frame['angle'])
# Log camera transform
rr.log(
"world/camera",
rr.Transform3D(
translation=position,
rotation=rr.Quaternion(xyzw=rotation_q)
)
)
# Log camera frustum
rr.log(
"world/camera/view",
rr.Pinhole(
focal_length=camera_frame['focalLength'],
width=1920,
height=1080
)
)
# Log frame number
rr.log(
"metadata/current_frame",
rr.TextDocument(f"Frame: {frame_idx + 1}/{len(camera_frames)}"),
)
def load_simulation_data(file) -> Tuple[Optional[List[Dict[str, Any]]], Optional[List[str]]]:
"""Load simulation data from JSON file and return simulations with their descriptions"""
if file is None:
return None, None
try:
json_data = json.load(open(file.name))
simulations = json_data['simulations']
# Create descriptions for each simulation
descriptions = [
f"Simulation {i}: {len(sim['subjects'])} subjects, {len(sim['instructions'])} instructions"
for i, sim in enumerate(simulations)
]
return simulations, descriptions
except Exception as e:
print(f"Error loading simulation data: {str(e)}")
return None, None
@spaces.GPU
def visualize_simulation(file, simulation_index: int) -> Optional[str]:
"""Process selected simulation and create Rerun visualization"""
if file is None:
return None
try:
simulations, _ = load_simulation_data(file)
if simulations is None or simulation_index >= len(simulations):
return None
# Create temporary file for RRD
temp_dir = tempfile.mkdtemp()
rrd_path = os.path.join(temp_dir, "simulation.rrd")
# Log selected simulation
simulation = simulations[simulation_index]
log_simulation(simulation)
rr.save(rrd_path)
return rrd_path
except Exception as e:
print(f"Error processing simulation: {str(e)}")
return None
def update_simulation_dropdown(file):
"""Update simulation dropdown when file is uploaded"""
_, descriptions = load_simulation_data(file)
return gr.Dropdown(choices=descriptions if descriptions else [], value=None)
# Create Gradio interface
with gr.Blocks() as demo:
gr.Markdown("""
# Camera Simulation Visualizer
Upload a JSON file containing camera simulation data and select a simulation to visualize.
""")
with gr.Row():
file_input = gr.File(
label="Upload Simulation JSON",
file_types=[".json"]
)
simulation_dropdown = gr.Dropdown(
label="Select Simulation",
choices=[],
type="index"
)
with gr.Row():
viewer = Rerun(streaming=False)
# Update dropdown when file is uploaded
file_input.change(
update_simulation_dropdown,
inputs=[file_input],
outputs=[simulation_dropdown]
)
# Visualize selected simulation
simulation_dropdown.change(
visualize_simulation,
inputs=[file_input, simulation_dropdown],
outputs=[viewer]
)
if __name__ == "__main__":
demo.queue().launch(share=False)