import json import numpy as np import rerun as rr import spaces import gradio as gr from gradio_rerun import Rerun from scipy.spatial.transform import Rotation import tempfile import os from typing import Optional, Dict, Any, List, Tuple def vector3_to_numpy(vec): """Convert Vector3 dictionary to numpy array""" return np.array([vec['x'], vec['y'], vec['z']]) def euler_to_quaternion(euler): """Convert Euler angles dictionary to quaternion""" return Rotation.from_euler('xyz', [euler['x'], euler['y'], euler['z']]).as_quat() def create_subject_mesh(subject): """Create a simple cube mesh for a subject""" position = vector3_to_numpy(subject['position']) size = vector3_to_numpy(subject['size']) # Create cube vertices vertices = np.array([ [-0.5, -0.5, -0.5], [0.5, -0.5, -0.5], [0.5, 0.5, -0.5], [-0.5, 0.5, -0.5], [-0.5, -0.5, 0.5], [0.5, -0.5, 0.5], [0.5, 0.5, 0.5], [-0.5, 0.5, 0.5] ]) * size.reshape(1, 3) + position.reshape(1, 3) # Create cube faces faces = np.array([ [0, 1, 2], [0, 2, 3], # front [1, 5, 6], [1, 6, 2], # right [5, 4, 7], [5, 7, 6], # back [4, 0, 3], [4, 3, 7], # left [3, 2, 6], [3, 6, 7], # top [4, 5, 1], [4, 1, 0] # bottom ]) return vertices, faces def log_simulation(simulation_data: Dict[str, Any]) -> None: """Log single simulation data to Rerun""" rr.init("camera_simulation") subjects = simulation_data['subjects'] camera_frames = simulation_data['cameraFrames'] instructions = simulation_data['instructions'] # Log simulation metadata rr.log("metadata/instructions", rr.TextDocument( "\n".join([ f"Instruction {i+1}:\n" + f" Movement: {inst['cameraMovement']}\n" + f" Easing: {inst['movementEasing']}\n" + f" Frames: {inst['frameCount']}\n" + f" Camera Angle: {inst.get('initialCameraAngle', 'N/A')}\n" + f" Shot Type: {inst.get('initialShotType', 'N/A')}\n" + f" Subject Index: {inst.get('subjectIndex', 'N/A')}" for i, inst in enumerate(instructions) ]) ), timeless=True) # Set up world coordinate system rr.log("world", rr.ViewCoordinates.RIGHT_HAND_Y_UP, timeless=True) # Log subjects (as simple cubes) for idx, subject in enumerate(subjects): vertices, faces = create_subject_mesh(subject) subject_color = [0.8, 0.2, 0.2, 1.0] if idx == simulation_data.get( 'selectedSubject') else [0.8, 0.8, 0.8, 1.0] rr.log( f"world/subject_{idx}", rr.Mesh3D( vertex_positions=vertices, indices=faces, # Apply color to all vertices colors=np.tile(subject_color, (len(vertices), 1)) ), timeless=True ) # Log subject class rr.log(f"world/subject_{idx}/class", rr.TextDocument(subject['objectClass']), timeless=True) # Log camera trajectory camera_positions = np.array( [vector3_to_numpy(frame['position']) for frame in camera_frames]) rr.log( "world/camera_trajectory", rr.Points3D( camera_positions, # Cyan color for trajectory colors=np.full((len(camera_positions), 4), [0.0, 0.8, 0.8, 1.0]) ), timeless=True ) # Log camera movement over time for frame_idx, camera_frame in enumerate(camera_frames): rr.set_time_sequence("frame", frame_idx) position = vector3_to_numpy(camera_frame['position']) rotation_q = euler_to_quaternion(camera_frame['angle']) # Log camera transform rr.log( "world/camera", rr.Transform3D( translation=position, rotation=rr.Quaternion(xyzw=rotation_q) ) ) # Log camera frustum rr.log( "world/camera/view", rr.Pinhole( focal_length=camera_frame['focalLength'], width=1920, height=1080 ) ) # Log frame number rr.log( "metadata/current_frame", rr.TextDocument(f"Frame: {frame_idx + 1}/{len(camera_frames)}"), ) def load_simulation_data(file) -> Tuple[Optional[List[Dict[str, Any]]], Optional[List[str]]]: """Load simulation data from JSON file and return simulations with their descriptions""" if file is None: return None, None try: json_data = json.load(open(file.name)) simulations = json_data['simulations'] # Create descriptions for each simulation descriptions = [ f"Simulation {i}: {len(sim['subjects'])} subjects, {len(sim['instructions'])} instructions" for i, sim in enumerate(simulations) ] return simulations, descriptions except Exception as e: print(f"Error loading simulation data: {str(e)}") return None, None @spaces.GPU def visualize_simulation(file, simulation_index: int) -> Optional[str]: """Process selected simulation and create Rerun visualization""" if file is None: return None try: simulations, _ = load_simulation_data(file) if simulations is None or simulation_index >= len(simulations): return None # Create temporary file for RRD temp_dir = tempfile.mkdtemp() rrd_path = os.path.join(temp_dir, "simulation.rrd") # Log selected simulation simulation = simulations[simulation_index] log_simulation(simulation) rr.save(rrd_path) return rrd_path except Exception as e: print(f"Error processing simulation: {str(e)}") return None def update_simulation_dropdown(file): """Update simulation dropdown when file is uploaded""" _, descriptions = load_simulation_data(file) return gr.Dropdown(choices=descriptions if descriptions else [], value=None) # Create Gradio interface with gr.Blocks() as demo: gr.Markdown(""" # Camera Simulation Visualizer Upload a JSON file containing camera simulation data and select a simulation to visualize. """) with gr.Row(): file_input = gr.File( label="Upload Simulation JSON", file_types=[".json"] ) simulation_dropdown = gr.Dropdown( label="Select Simulation", choices=[], type="index" ) with gr.Row(): viewer = Rerun(streaming=False) # Update dropdown when file is uploaded file_input.change( update_simulation_dropdown, inputs=[file_input], outputs=[simulation_dropdown] ) # Visualize selected simulation simulation_dropdown.change( visualize_simulation, inputs=[file_input, simulation_dropdown], outputs=[viewer] ) if __name__ == "__main__": demo.queue().launch(share=False)