from dataclasses import dataclass import os from typing import Union, List, Optional import warnings import copy from mmcv import Config import numpy as np import torch from torch import Tensor from torch.utils.data import Dataset from risk_biased.scene_dataset.pedestrian import RandomPedestrians from risk_biased.utils.torch_utils import torch_linspace @dataclass class RandomSceneParams: """Dataclass that defines all the listed parameters that are necessary for a RandomScene object Args: batch_size: number of scenes in the batch time_scene: time length of the scene in seconds sample_times: list of times to get the positions ego_ref_speed: constant reference speed of the ego vehicle in meters/seconds ego_speed_init_low: lowest initial speed of the ego vehicle in meters/seconds ego_speed_init_high: higest initial speed of the ego vehicle in meters/seconds ego_acceleration_mean_low: lowest mean acceleration of the ego vehicle in m/s^2 ego_acceleration_mean_high: highest mean acceleration of the ego vehicle in m/s^2 ego_acceleration_std: std for acceleration of the ego vehicle in m/s^2 ego_length: length of the ego vehicle in meters ego_width: width of the ego vehicle in meters dt: time step to use in the trajectory sequence fast_speed: fast walking speed for the random pedestrian in meters/seconds slow_speed: slow walking speed for the random pedestrian in meters/seconds p_change_pace: probability that a slow (resp. fast) pedestrian walk at fast_speed (resp. slow_speed) at each time step proportion_fast: proportion of the pedestrians that are mainly walking at fast_speed perception_noise_std: standard deviation of the gaussian noise that is affecting the position observations """ batch_size: int time_scene: float sample_times: list ego_ref_speed: float ego_speed_init_low: float ego_speed_init_high: float ego_acceleration_mean_low: float ego_acceleration_mean_high: float ego_acceleration_std: float ego_length: float ego_width: float dt: float fast_speed: float slow_speed: float p_change_pace: float proportion_fast: float perception_noise_std: float @staticmethod def from_config(cfg: Config): return RandomSceneParams( batch_size=cfg.batch_size, sample_times=cfg.sample_times, time_scene=cfg.time_scene, ego_ref_speed=cfg.ego_ref_speed, ego_speed_init_low=cfg.ego_speed_init_low, ego_speed_init_high=cfg.ego_speed_init_high, ego_acceleration_mean_low=cfg.ego_acceleration_mean_low, ego_acceleration_mean_high=cfg.ego_acceleration_mean_high, ego_acceleration_std=cfg.ego_acceleration_std, ego_length=cfg.ego_length, ego_width=cfg.ego_width, dt=cfg.dt, fast_speed=cfg.fast_speed, slow_speed=cfg.slow_speed, p_change_pace=cfg.p_change_pace, proportion_fast=cfg.proportion_fast, perception_noise_std=cfg.perception_noise_std, ) class RandomScene: """ Batched scenes with one vehicle at constant velocity and one random pedestrian. Utility functions to draw the scene and compute risk factors (time to collision etc...) Args: params: dataclass containing the necessary parameters is_torch: set to True to produce Tensor batches and to False to produce numpy arrays """ def __init__( self, params: RandomSceneParams, is_torch: bool = False, ) -> None: self._is_torch = is_torch self._batch_size = params.batch_size self._fast_speed = params.fast_speed self._slow_speed = params.slow_speed self._p_change_pace = params.p_change_pace self._proportion_fast = params.proportion_fast self.dt = params.dt self.sample_times = params.sample_times self.ego_ref_speed = params.ego_ref_speed self._ego_speed_init_low = params.ego_speed_init_low self._ego_speed_init_high = params.ego_speed_init_high self._ego_acceleration_mean_low = params.ego_acceleration_mean_low self._ego_acceleration_mean_high = params.ego_acceleration_mean_high self._ego_acceleration_std = params.ego_acceleration_std self.perception_noise_std = params.perception_noise_std self.road_length = ( params.ego_ref_speed + params.fast_speed ) * params.time_scene self.time_scene = params.time_scene self.lane_width = 3 self.sidewalks_width = 1.5 self.road_width = 2 * self.lane_width + 2 * self.sidewalks_width self.bottom = -self.lane_width / 2 - self.sidewalks_width self.top = 3 * self.lane_width / 2 + self.sidewalks_width self.ego_width = 1.75 self.ego_length = 4 self.current_time = 0 if self._is_torch: pedestrians_x = ( torch.rand(params.batch_size, 1) * (self.road_length - self.ego_length / 2) + self.ego_length / 2 ) pedestrians_y = ( torch.rand(params.batch_size, 1) * (self.top - self.bottom) + self.bottom ) self._pedestrians_positions = torch.stack( (pedestrians_x, pedestrians_y), -1 ) else: pedestrians_x = np.random.uniform( low=self.ego_length / 2, high=self.road_length, size=(params.batch_size, 1), ) pedestrians_y = np.random.uniform( low=self.bottom, high=self.top, size=(params.batch_size, 1) ) self._pedestrians_positions = np.stack((pedestrians_x, pedestrians_y), -1) self.pedestrians = RandomPedestrians( batch_size=self._batch_size, dt=self.dt, fast_speed=self._fast_speed, slow_speed=self._slow_speed, p_change_pace=self._p_change_pace, proportion_fast=self._proportion_fast, is_torch=self._is_torch, ) self._set_pedestrians() @property def pedestrians_positions(self): # relative_positions = self._pedestrians_positions/[[(self.road_length - self.ego_length / 2), (self.top - self.bottom)]] - [[self.ego_length / 2, self.bottom]] return self._pedestrians_positions def set_pedestrians_states( self, relative_pedestrians_positions: Union[torch.Tensor, np.ndarray], pedestrians_angles: Optional[Union[torch.Tensor, np.ndarray]] = None, ): """Force pedestrian initial states Args: relative_pedestrians_positions: Relative positions in the scene as percentage distance from left to right and from bottom to top pedestrians_angles: Pedestrian heading angles in radiants """ if self._is_torch: assert isinstance(relative_pedestrians_positions, torch.Tensor) else: assert isinstance(relative_pedestrians_positions, np.ndarray) self._batch_size = relative_pedestrians_positions.shape[0] if (0 > relative_pedestrians_positions).any() or ( relative_pedestrians_positions > 1 ).any(): warnings.warn( "Some of the given pedestrian initial positions are outside of the road range" ) center_y = (self.top - self.bottom) * relative_pedestrians_positions[ :, :, 1 ] + self.bottom center_x = ( self.road_length - self.ego_length / 2 ) * relative_pedestrians_positions[:, :, 0] + self.ego_length / 2 if self._is_torch: pedestrians_positions = torch.stack([center_x, center_y], -1) else: pedestrians_positions = np.stack([center_x, center_y], -1) self.pedestrians = RandomPedestrians( batch_size=self._batch_size, dt=self.dt, fast_speed=self._fast_speed, slow_speed=self._slow_speed, p_change_pace=self._p_change_pace, proportion_fast=self._proportion_fast, is_torch=self._is_torch, ) self._pedestrians_positions = pedestrians_positions if pedestrians_angles is not None: self.pedestrians.angle = pedestrians_angles self._set_pedestrians() def _set_pedestrians(self): self.pedestrians_trajectories = self.sample_pedestrians_trajectories( self.sample_times ) self.final_pedestrians_positions = self.pedestrians_trajectories[:, :, -1] def get_ego_ref_trajectory(self, time_sequence: list): """ Returns only one ego reference trajectory and not a batch because it is always the same. Args: time_sequence: the time points at which to get the positions """ out = np.array([[[[t * self.ego_ref_speed, 0] for t in time_sequence]]]) if self._is_torch: return torch.from_numpy(out.astype("float32")) else: return out def get_pedestrians_velocities(self): """ Returns the batch of mean pedestrian velocities between their positions and their final positions. """ return (self.final_pedestrians_positions - self._pedestrians_positions)[ :, None ] / self.time_scene def get_ego_ref_velocity(self): """ Returns the reference ego velocity. """ if self._is_torch: return torch.from_numpy( np.array([[[[self.ego_ref_speed, 0]]]], dtype="float32") ) else: return np.array([[[[self.ego_ref_speed, 0]]]]) def get_ego_ref_position(self): """ Returns the current reference ego position (at set time self.current_time) """ if self._is_torch: return torch.from_numpy( np.array( [[[[self.ego_ref_speed * self.current_time, 0]]]], dtype="float32" ) ) else: return np.array([[[[self.ego_ref_speed * self.current_time, 0]]]]) def set_current_time(self, time: float): """ Set the current time of the scene. Args: time : The current time to set. It should be between 0 and 1 """ assert 0 <= time <= self.time_scene self.current_time = time def sample_ego_velocities(self, time_sequence: list): """ Get ego velocity trajectories following the ego's acceleration distribution and the initial velocity distribution. Args: time_sequence: a list of time points at which to sample the trajectory positions. Returns: batch of sequence of velocities of shape (batch_size, 1, len(time_sequence), 2) """ vel_traj = [] # uniform sampling of acceleration_mean between self._ego_acceleration_mean_low and # self._ego_acceleration_mean_high acceleration_mean = np.random.rand(self._batch_size, 2) * np.array( [ self._ego_acceleration_mean_high - self._ego_acceleration_mean_low, 0.0, ] ) + np.array([self._ego_acceleration_mean_low, 0.0]) t_prev = 0 # uniform sampling of initial velocity between self._ego_speed_init_low and # self._ego_speed_init_high vel_prev = np.random.rand(self._batch_size, 2) * np.array( [self._ego_speed_init_high - self._ego_speed_init_low, 0.0] ) + np.array([self._ego_speed_init_low, 0.0]) for t in time_sequence: # integrate accelerations once to get velocities acceleration = acceleration_mean + np.random.randn( self._batch_size, 2 ) * np.array([self._ego_acceleration_std, 0.0]) vel_prev = vel_prev + acceleration * (t - t_prev) t_prev = t vel_traj.append(vel_prev) vel_traj = np.stack(vel_traj, 1) if self._is_torch: vel_traj = torch.from_numpy(vel_traj.astype("float32")) return vel_traj[:, None] def sample_ego_trajectories(self, time_sequence: list): """ Get ego trajectories following the ego's acceleration distribution and the initial velocity distribution. Args: time_sequence: a list of time points at which to sample the trajectory positions. Returns: batch of sequence of positions of shape (batch_size, len(time_sequence), 2) """ vel_traj = self.sample_ego_velocities(time_sequence) traj = [] t_prev = 0 pos_prev = np.array([[0, 0]], dtype="float32") if self._is_torch: pos_prev = torch.from_numpy(pos_prev) for idx, t in enumerate(time_sequence): # integrate velocities once to get positions vel = vel_traj[:, :, idx, :] pos_prev = pos_prev + vel * (t - t_prev) t_prev = t traj.append(pos_prev) if self._is_torch: return torch.stack(traj, -2) else: return np.stack(traj, -2) def sample_pedestrians_trajectories(self, time_sequence: list): """ Produce pedestrian trajectories following the pedestrian behavior distribution (it is resampled, the final position will not match self.final_pedestrians_positions) Args: time_sequence: a list of time points at which to sample the trajectory positions. Returns: batch of sequence of positions of shape (batch_size, len(time_sequence), 2) """ traj = [] t_prev = 0 pos_prev = self.pedestrians_positions for t in time_sequence: pos_prev = ( pos_prev + self.pedestrians.get_final_position(t - t_prev) - self.pedestrians.position ) t_prev = t traj.append(pos_prev) if self._is_torch: traj = torch.stack(traj, 2) return traj + torch.randn_like(traj) * self.perception_noise_std else: traj = np.stack(traj, 2) return traj + np.random.randn(*traj.shape) * self.perception_noise_std def get_pedestrians_trajectories(self): """ Returns the batch of pedestrian trajectories sampled every dt. """ return self.pedestrians_trajectories def get_pedestrian_trajectory(self, ind: int, time_sequence: list = None): """ Returns one pedestrian trajectory of index ind sampled at times set in time_sequence. Args: ind: index of the pedestrian in the batch. time_sequence: a list of time points at which to sample the trajectory positions. Returns: A pedestrian trajectory of shape (len(time_sequence), 2) """ len_traj = len(self.sample_times) if self._is_torch: ped_traj = torch_linspace( self.pedestrians_positions[ind], self.final_pedestrians_positions[ind], len_traj, ) else: ped_traj = np.linspace( self.pedestrians_positions[ind], self.final_pedestrians_positions[ind], len_traj, ) if time_sequence is not None: n_steps = [int(t / self.dt) for t in time_sequence] else: n_steps = range(int(self.time_scene / self.dt)) return ped_traj[n_steps] class SceneDataset(Dataset): """ Dataset of scenes with one vehicle at constant velocity and one random pedestrian. The scenes are randomly generated so the distribution can be sampled at each batch or pre-fetched. Args: len: int number of scenes per epoch params: dataclass defining all the necessary parameters pre_fetch: set to True to fetch the whole dataset at initialization """ def __init__( self, len: int, params: RandomSceneParams, pre_fetch: bool = True, ) -> None: super().__init__() self._pre_fetch = pre_fetch self._len = len self._sample_times = params.sample_times self.params = copy.deepcopy(params) params.batch_size = len if self._pre_fetch: self.scene_set = RandomScene( params, is_torch=True ).sample_pedestrians_trajectories(self._sample_times) def __len__(self) -> int: return self._len # This is a hack, get item only returns the index so that the collate_fn can handle making the batch without looping on RandomScene creation. def __getitem__(self, index: int) -> Tensor: return index def collate_fn(self, index_list: list) -> Tensor: if self._pre_fetch: return self.scene_set[torch.from_numpy(np.array(index_list))] else: self.params.batch_size = len(index_list) return RandomScene( self.params, is_torch=True, ).sample_pedestrians_trajectories(self._sample_times) # Call this function to create a dataset as a .npy file that can be loaded as a numpy array with np.load(file_name.npy) def save_dataset(file_path: str, size: int, config: Config): """ Save a dataset at file_path using the configuration. Args: file_path: Where to save the dataset size: Number of samples to save config: Configuration to use for the dataset generation """ dir_path = os.path.dirname(file_path) config_path = os.path.join(dir_path, "config.py") config = copy.deepcopy(config) config.batch_size = size params = RandomSceneParams.from_config(config) scene = RandomScene( params, is_torch=False, ) data_pedestrian = scene.sample_pedestrians_trajectories(config.sample_times) data_ego = scene.sample_ego_trajectories(config.sample_times) data = np.stack([data_pedestrian, data_ego], 0) np.save(file_path, data) # Cannot use config.dump here because it is buggy and does not work if config was not loaded from a file. with open(config_path, "w", encoding="utf-8") as f: f.write(config.pretty_text) def load_create_dataset( config: Config, base_dir=None, ) -> List: """ Load the dataset described by its config if it exists or create one. Args: config: Configuration to use for the dataset base_dir: Where to look for the dataset or to save it. """ if base_dir is None: base_dir = os.path.join( os.path.dirname(os.path.realpath(__file__)), "..", "..", "data" ) found = False dataset_out = [] i = 0 dir_path = os.path.join(base_dir, f"scene_dataset_{i:03d}") while os.path.exists(dir_path): config_path = os.path.join(dir_path, "config.py") if os.path.exists(config_path): config_check = Config.fromfile(config_path) if config_check.dataset_parameters == config.dataset_parameters: found = True break else: warnings.warn( f"Dataset directory {dir_path} exists but doesn't contain a config file. Cannot use it." ) i += 1 dir_path = os.path.join(base_dir, f"scene_dataset_{i:03d}") if not found: print(f"Dataset not found, creating a new one.") os.makedirs(dir_path) for dataset in config.datasets: dataset_name = f"scene_dataset_{dataset}.npy" dataset_path = os.path.join(dir_path, dataset_name) save_dataset(dataset_path, config.datasets_sizes[dataset], config) if found: print(f"Loading existing dataset at {dir_path}.") for dataset in config.datasets: dataset_path = os.path.join(dir_path, f"scene_dataset_{dataset}.npy") dataset_out.append(torch.from_numpy(np.load(dataset_path).astype("float32"))) return dataset_out