Spaces:
Running
Running
from dataclasses import dataclass | |
import os | |
from typing import Union, List, Optional | |
import warnings | |
import copy | |
from mmcv import Config | |
import numpy as np | |
import torch | |
from torch import Tensor | |
from torch.utils.data import Dataset | |
from risk_biased.scene_dataset.pedestrian import RandomPedestrians | |
from risk_biased.utils.torch_utils import torch_linspace | |
class RandomSceneParams: | |
"""Dataclass that defines all the listed parameters that are necessary for a RandomScene object | |
Args: | |
batch_size: number of scenes in the batch | |
time_scene: time length of the scene in seconds | |
sample_times: list of times to get the positions | |
ego_ref_speed: constant reference speed of the ego vehicle in meters/seconds | |
ego_speed_init_low: lowest initial speed of the ego vehicle in meters/seconds | |
ego_speed_init_high: higest initial speed of the ego vehicle in meters/seconds | |
ego_acceleration_mean_low: lowest mean acceleration of the ego vehicle in m/s^2 | |
ego_acceleration_mean_high: highest mean acceleration of the ego vehicle in m/s^2 | |
ego_acceleration_std: std for acceleration of the ego vehicle in m/s^2 | |
ego_length: length of the ego vehicle in meters | |
ego_width: width of the ego vehicle in meters | |
dt: time step to use in the trajectory sequence | |
fast_speed: fast walking speed for the random pedestrian in meters/seconds | |
slow_speed: slow walking speed for the random pedestrian in meters/seconds | |
p_change_pace: probability that a slow (resp. fast) pedestrian walk at fast_speed (resp. slow_speed) at each time step | |
proportion_fast: proportion of the pedestrians that are mainly walking at fast_speed | |
perception_noise_std: standard deviation of the gaussian noise that is affecting the position observations | |
""" | |
batch_size: int | |
time_scene: float | |
sample_times: list | |
ego_ref_speed: float | |
ego_speed_init_low: float | |
ego_speed_init_high: float | |
ego_acceleration_mean_low: float | |
ego_acceleration_mean_high: float | |
ego_acceleration_std: float | |
ego_length: float | |
ego_width: float | |
dt: float | |
fast_speed: float | |
slow_speed: float | |
p_change_pace: float | |
proportion_fast: float | |
perception_noise_std: float | |
def from_config(cfg: Config): | |
return RandomSceneParams( | |
batch_size=cfg.batch_size, | |
sample_times=cfg.sample_times, | |
time_scene=cfg.time_scene, | |
ego_ref_speed=cfg.ego_ref_speed, | |
ego_speed_init_low=cfg.ego_speed_init_low, | |
ego_speed_init_high=cfg.ego_speed_init_high, | |
ego_acceleration_mean_low=cfg.ego_acceleration_mean_low, | |
ego_acceleration_mean_high=cfg.ego_acceleration_mean_high, | |
ego_acceleration_std=cfg.ego_acceleration_std, | |
ego_length=cfg.ego_length, | |
ego_width=cfg.ego_width, | |
dt=cfg.dt, | |
fast_speed=cfg.fast_speed, | |
slow_speed=cfg.slow_speed, | |
p_change_pace=cfg.p_change_pace, | |
proportion_fast=cfg.proportion_fast, | |
perception_noise_std=cfg.perception_noise_std, | |
) | |
class RandomScene: | |
""" | |
Batched scenes with one vehicle at constant velocity and one random pedestrian. Utility functions to draw the scene and compute risk factors (time to collision etc...) | |
Args: | |
params: dataclass containing the necessary parameters | |
is_torch: set to True to produce Tensor batches and to False to produce numpy arrays | |
""" | |
def __init__( | |
self, | |
params: RandomSceneParams, | |
is_torch: bool = False, | |
) -> None: | |
self._is_torch = is_torch | |
self._batch_size = params.batch_size | |
self._fast_speed = params.fast_speed | |
self._slow_speed = params.slow_speed | |
self._p_change_pace = params.p_change_pace | |
self._proportion_fast = params.proportion_fast | |
self.dt = params.dt | |
self.sample_times = params.sample_times | |
self.ego_ref_speed = params.ego_ref_speed | |
self._ego_speed_init_low = params.ego_speed_init_low | |
self._ego_speed_init_high = params.ego_speed_init_high | |
self._ego_acceleration_mean_low = params.ego_acceleration_mean_low | |
self._ego_acceleration_mean_high = params.ego_acceleration_mean_high | |
self._ego_acceleration_std = params.ego_acceleration_std | |
self.perception_noise_std = params.perception_noise_std | |
self.road_length = ( | |
params.ego_ref_speed + params.fast_speed | |
) * params.time_scene | |
self.time_scene = params.time_scene | |
self.lane_width = 3 | |
self.sidewalks_width = 1.5 | |
self.road_width = 2 * self.lane_width + 2 * self.sidewalks_width | |
self.bottom = -self.lane_width / 2 - self.sidewalks_width | |
self.top = 3 * self.lane_width / 2 + self.sidewalks_width | |
self.ego_width = 1.75 | |
self.ego_length = 4 | |
self.current_time = 0 | |
if self._is_torch: | |
pedestrians_x = ( | |
torch.rand(params.batch_size, 1) | |
* (self.road_length - self.ego_length / 2) | |
+ self.ego_length / 2 | |
) | |
pedestrians_y = ( | |
torch.rand(params.batch_size, 1) * (self.top - self.bottom) | |
+ self.bottom | |
) | |
self._pedestrians_positions = torch.stack( | |
(pedestrians_x, pedestrians_y), -1 | |
) | |
else: | |
pedestrians_x = np.random.uniform( | |
low=self.ego_length / 2, | |
high=self.road_length, | |
size=(params.batch_size, 1), | |
) | |
pedestrians_y = np.random.uniform( | |
low=self.bottom, high=self.top, size=(params.batch_size, 1) | |
) | |
self._pedestrians_positions = np.stack((pedestrians_x, pedestrians_y), -1) | |
self.pedestrians = RandomPedestrians( | |
batch_size=self._batch_size, | |
dt=self.dt, | |
fast_speed=self._fast_speed, | |
slow_speed=self._slow_speed, | |
p_change_pace=self._p_change_pace, | |
proportion_fast=self._proportion_fast, | |
is_torch=self._is_torch, | |
) | |
self._set_pedestrians() | |
def pedestrians_positions(self): | |
# relative_positions = self._pedestrians_positions/[[(self.road_length - self.ego_length / 2), (self.top - self.bottom)]] - [[self.ego_length / 2, self.bottom]] | |
return self._pedestrians_positions | |
def set_pedestrians_states( | |
self, | |
relative_pedestrians_positions: Union[torch.Tensor, np.ndarray], | |
pedestrians_angles: Optional[Union[torch.Tensor, np.ndarray]] = None, | |
): | |
"""Force pedestrian initial states | |
Args: | |
relative_pedestrians_positions: Relative positions in the scene as percentage distance from left to right and from bottom to top | |
pedestrians_angles: Pedestrian heading angles in radiants | |
""" | |
if self._is_torch: | |
assert isinstance(relative_pedestrians_positions, torch.Tensor) | |
else: | |
assert isinstance(relative_pedestrians_positions, np.ndarray) | |
self._batch_size = relative_pedestrians_positions.shape[0] | |
if (0 > relative_pedestrians_positions).any() or ( | |
relative_pedestrians_positions > 1 | |
).any(): | |
warnings.warn( | |
"Some of the given pedestrian initial positions are outside of the road range" | |
) | |
center_y = (self.top - self.bottom) * relative_pedestrians_positions[ | |
:, :, 1 | |
] + self.bottom | |
center_x = ( | |
self.road_length - self.ego_length / 2 | |
) * relative_pedestrians_positions[:, :, 0] + self.ego_length / 2 | |
if self._is_torch: | |
pedestrians_positions = torch.stack([center_x, center_y], -1) | |
else: | |
pedestrians_positions = np.stack([center_x, center_y], -1) | |
self.pedestrians = RandomPedestrians( | |
batch_size=self._batch_size, | |
dt=self.dt, | |
fast_speed=self._fast_speed, | |
slow_speed=self._slow_speed, | |
p_change_pace=self._p_change_pace, | |
proportion_fast=self._proportion_fast, | |
is_torch=self._is_torch, | |
) | |
self._pedestrians_positions = pedestrians_positions | |
if pedestrians_angles is not None: | |
self.pedestrians.angle = pedestrians_angles | |
self._set_pedestrians() | |
def _set_pedestrians(self): | |
self.pedestrians_trajectories = self.sample_pedestrians_trajectories( | |
self.sample_times | |
) | |
self.final_pedestrians_positions = self.pedestrians_trajectories[:, :, -1] | |
def get_ego_ref_trajectory(self, time_sequence: list): | |
""" | |
Returns only one ego reference trajectory and not a batch because it is always the same. | |
Args: | |
time_sequence: the time points at which to get the positions | |
""" | |
out = np.array([[[[t * self.ego_ref_speed, 0] for t in time_sequence]]]) | |
if self._is_torch: | |
return torch.from_numpy(out.astype("float32")) | |
else: | |
return out | |
def get_pedestrians_velocities(self): | |
""" | |
Returns the batch of mean pedestrian velocities between their positions and their final positions. | |
""" | |
return (self.final_pedestrians_positions - self._pedestrians_positions)[ | |
:, None | |
] / self.time_scene | |
def get_ego_ref_velocity(self): | |
""" | |
Returns the reference ego velocity. | |
""" | |
if self._is_torch: | |
return torch.from_numpy( | |
np.array([[[[self.ego_ref_speed, 0]]]], dtype="float32") | |
) | |
else: | |
return np.array([[[[self.ego_ref_speed, 0]]]]) | |
def get_ego_ref_position(self): | |
""" | |
Returns the current reference ego position (at set time self.current_time) | |
""" | |
if self._is_torch: | |
return torch.from_numpy( | |
np.array( | |
[[[[self.ego_ref_speed * self.current_time, 0]]]], dtype="float32" | |
) | |
) | |
else: | |
return np.array([[[[self.ego_ref_speed * self.current_time, 0]]]]) | |
def set_current_time(self, time: float): | |
""" | |
Set the current time of the scene. | |
Args: | |
time : The current time to set. It should be between 0 and 1 | |
""" | |
assert 0 <= time <= self.time_scene | |
self.current_time = time | |
def sample_ego_velocities(self, time_sequence: list): | |
""" | |
Get ego velocity trajectories following the ego's acceleration distribution and the initial | |
velocity distribution. | |
Args: | |
time_sequence: a list of time points at which to sample the trajectory positions. | |
Returns: | |
batch of sequence of velocities of shape (batch_size, 1, len(time_sequence), 2) | |
""" | |
vel_traj = [] | |
# uniform sampling of acceleration_mean between self._ego_acceleration_mean_low and | |
# self._ego_acceleration_mean_high | |
acceleration_mean = np.random.rand(self._batch_size, 2) * np.array( | |
[ | |
self._ego_acceleration_mean_high - self._ego_acceleration_mean_low, | |
0.0, | |
] | |
) + np.array([self._ego_acceleration_mean_low, 0.0]) | |
t_prev = 0 | |
# uniform sampling of initial velocity between self._ego_speed_init_low and | |
# self._ego_speed_init_high | |
vel_prev = np.random.rand(self._batch_size, 2) * np.array( | |
[self._ego_speed_init_high - self._ego_speed_init_low, 0.0] | |
) + np.array([self._ego_speed_init_low, 0.0]) | |
for t in time_sequence: | |
# integrate accelerations once to get velocities | |
acceleration = acceleration_mean + np.random.randn( | |
self._batch_size, 2 | |
) * np.array([self._ego_acceleration_std, 0.0]) | |
vel_prev = vel_prev + acceleration * (t - t_prev) | |
t_prev = t | |
vel_traj.append(vel_prev) | |
vel_traj = np.stack(vel_traj, 1) | |
if self._is_torch: | |
vel_traj = torch.from_numpy(vel_traj.astype("float32")) | |
return vel_traj[:, None] | |
def sample_ego_trajectories(self, time_sequence: list): | |
""" | |
Get ego trajectories following the ego's acceleration distribution and the initial velocity | |
distribution. | |
Args: | |
time_sequence: a list of time points at which to sample the trajectory positions. | |
Returns: | |
batch of sequence of positions of shape (batch_size, len(time_sequence), 2) | |
""" | |
vel_traj = self.sample_ego_velocities(time_sequence) | |
traj = [] | |
t_prev = 0 | |
pos_prev = np.array([[0, 0]], dtype="float32") | |
if self._is_torch: | |
pos_prev = torch.from_numpy(pos_prev) | |
for idx, t in enumerate(time_sequence): | |
# integrate velocities once to get positions | |
vel = vel_traj[:, :, idx, :] | |
pos_prev = pos_prev + vel * (t - t_prev) | |
t_prev = t | |
traj.append(pos_prev) | |
if self._is_torch: | |
return torch.stack(traj, -2) | |
else: | |
return np.stack(traj, -2) | |
def sample_pedestrians_trajectories(self, time_sequence: list): | |
""" | |
Produce pedestrian trajectories following the pedestrian behavior distribution | |
(it is resampled, the final position will not match self.final_pedestrians_positions) | |
Args: | |
time_sequence: a list of time points at which to sample the trajectory positions. | |
Returns: | |
batch of sequence of positions of shape (batch_size, len(time_sequence), 2) | |
""" | |
traj = [] | |
t_prev = 0 | |
pos_prev = self.pedestrians_positions | |
for t in time_sequence: | |
pos_prev = ( | |
pos_prev | |
+ self.pedestrians.get_final_position(t - t_prev) | |
- self.pedestrians.position | |
) | |
t_prev = t | |
traj.append(pos_prev) | |
if self._is_torch: | |
traj = torch.stack(traj, 2) | |
return traj + torch.randn_like(traj) * self.perception_noise_std | |
else: | |
traj = np.stack(traj, 2) | |
return traj + np.random.randn(*traj.shape) * self.perception_noise_std | |
def get_pedestrians_trajectories(self): | |
""" | |
Returns the batch of pedestrian trajectories sampled every dt. | |
""" | |
return self.pedestrians_trajectories | |
def get_pedestrian_trajectory(self, ind: int, time_sequence: list = None): | |
""" | |
Returns one pedestrian trajectory of index ind sampled at times set in time_sequence. | |
Args: | |
ind: index of the pedestrian in the batch. | |
time_sequence: a list of time points at which to sample the trajectory positions. | |
Returns: | |
A pedestrian trajectory of shape (len(time_sequence), 2) | |
""" | |
len_traj = len(self.sample_times) | |
if self._is_torch: | |
ped_traj = torch_linspace( | |
self.pedestrians_positions[ind], | |
self.final_pedestrians_positions[ind], | |
len_traj, | |
) | |
else: | |
ped_traj = np.linspace( | |
self.pedestrians_positions[ind], | |
self.final_pedestrians_positions[ind], | |
len_traj, | |
) | |
if time_sequence is not None: | |
n_steps = [int(t / self.dt) for t in time_sequence] | |
else: | |
n_steps = range(int(self.time_scene / self.dt)) | |
return ped_traj[n_steps] | |
class SceneDataset(Dataset): | |
""" | |
Dataset of scenes with one vehicle at constant velocity and one random pedestrian. | |
The scenes are randomly generated so the distribution can be sampled at each batch or pre-fetched. | |
Args: | |
len: int number of scenes per epoch | |
params: dataclass defining all the necessary parameters | |
pre_fetch: set to True to fetch the whole dataset at initialization | |
""" | |
def __init__( | |
self, | |
len: int, | |
params: RandomSceneParams, | |
pre_fetch: bool = True, | |
) -> None: | |
super().__init__() | |
self._pre_fetch = pre_fetch | |
self._len = len | |
self._sample_times = params.sample_times | |
self.params = copy.deepcopy(params) | |
params.batch_size = len | |
if self._pre_fetch: | |
self.scene_set = RandomScene( | |
params, is_torch=True | |
).sample_pedestrians_trajectories(self._sample_times) | |
def __len__(self) -> int: | |
return self._len | |
# This is a hack, get item only returns the index so that the collate_fn can handle making the batch without looping on RandomScene creation. | |
def __getitem__(self, index: int) -> Tensor: | |
return index | |
def collate_fn(self, index_list: list) -> Tensor: | |
if self._pre_fetch: | |
return self.scene_set[torch.from_numpy(np.array(index_list))] | |
else: | |
self.params.batch_size = len(index_list) | |
return RandomScene( | |
self.params, | |
is_torch=True, | |
).sample_pedestrians_trajectories(self._sample_times) | |
# Call this function to create a dataset as a .npy file that can be loaded as a numpy array with np.load(file_name.npy) | |
def save_dataset(file_path: str, size: int, config: Config): | |
""" | |
Save a dataset at file_path using the configuration. | |
Args: | |
file_path: Where to save the dataset | |
size: Number of samples to save | |
config: Configuration to use for the dataset generation | |
""" | |
dir_path = os.path.dirname(file_path) | |
config_path = os.path.join(dir_path, "config.py") | |
config = copy.deepcopy(config) | |
config.batch_size = size | |
params = RandomSceneParams.from_config(config) | |
scene = RandomScene( | |
params, | |
is_torch=False, | |
) | |
data_pedestrian = scene.sample_pedestrians_trajectories(config.sample_times) | |
data_ego = scene.sample_ego_trajectories(config.sample_times) | |
data = np.stack([data_pedestrian, data_ego], 0) | |
np.save(file_path, data) | |
# Cannot use config.dump here because it is buggy and does not work if config was not loaded from a file. | |
with open(config_path, "w", encoding="utf-8") as f: | |
f.write(config.pretty_text) | |
def load_create_dataset( | |
config: Config, | |
base_dir=None, | |
) -> List: | |
""" | |
Load the dataset described by its config if it exists or create one. | |
Args: | |
config: Configuration to use for the dataset | |
base_dir: Where to look for the dataset or to save it. | |
""" | |
if base_dir is None: | |
base_dir = os.path.join( | |
os.path.dirname(os.path.realpath(__file__)), "..", "..", "data" | |
) | |
found = False | |
dataset_out = [] | |
i = 0 | |
dir_path = os.path.join(base_dir, f"scene_dataset_{i:03d}") | |
while os.path.exists(dir_path): | |
config_path = os.path.join(dir_path, "config.py") | |
if os.path.exists(config_path): | |
config_check = Config.fromfile(config_path) | |
if config_check.dataset_parameters == config.dataset_parameters: | |
found = True | |
break | |
else: | |
warnings.warn( | |
f"Dataset directory {dir_path} exists but doesn't contain a config file. Cannot use it." | |
) | |
i += 1 | |
dir_path = os.path.join(base_dir, f"scene_dataset_{i:03d}") | |
if not found: | |
print(f"Dataset not found, creating a new one.") | |
os.makedirs(dir_path) | |
for dataset in config.datasets: | |
dataset_name = f"scene_dataset_{dataset}.npy" | |
dataset_path = os.path.join(dir_path, dataset_name) | |
save_dataset(dataset_path, config.datasets_sizes[dataset], config) | |
if found: | |
print(f"Loading existing dataset at {dir_path}.") | |
for dataset in config.datasets: | |
dataset_path = os.path.join(dir_path, f"scene_dataset_{dataset}.npy") | |
dataset_out.append(torch.from_numpy(np.load(dataset_path).astype("float32"))) | |
return dataset_out | |