Spaces:
Running
Running
import numpy as np | |
import torch | |
from torch import Tensor | |
from typing import Union | |
class RandomPedestrians: | |
""" | |
Batched random pedestrians. | |
There are two types of pedestrians, slow and fast ones. | |
Each pedestrian type is walking mainly at its constant favored speed but at each time step there is a probability that it changes its pace. | |
Args: | |
batch_size: int number of scenes in the batch | |
dt: float time step to use in the trajectory sequence | |
fast_speed: float fast walking speed for the random pedestrian in meters/seconds | |
slow_speed: float slow walking speed for the random pedestrian in meters/seconds | |
p_change_pace: float probability that a slow (resp. fast) pedestrian walk at fast_speed (resp. slow_speed) at each time step | |
proportion_fast: float proportion of the pedestrians that are mainly walking at fast_speed | |
is_torch: bool set to True to produce Tensor batches and to False to produce numpy arrays | |
""" | |
def __init__( | |
self, | |
batch_size: int, | |
dt: float = 0.1, | |
fast_speed: float = 2, | |
slow_speed: float = 1, | |
p_change_pace: float = 0.1, | |
proportion_fast: float = 0.5, | |
is_torch: bool = False, | |
) -> None: | |
self.is_torch = is_torch | |
self.fast_speed: float = fast_speed | |
self.slow_speed: float = slow_speed | |
self.dt: float = dt | |
self.p_change_pace: float = p_change_pace | |
self.batch_size: int = batch_size | |
self.propotion_fast: float = proportion_fast | |
if self.is_torch: | |
self.is_fast_type: Tensor = torch.from_numpy( | |
np.random.binomial(1, self.propotion_fast, [batch_size, 1, 1]).astype( | |
"float32" | |
) | |
) | |
self.is_currently_fast: Tensor = self.is_fast_type.clone() | |
self.initial_position: Tensor = torch.zeros([batch_size, 1, 2]) | |
self.position: Tensor = self.initial_position.clone() | |
self._angle: Tensor = (2 * torch.rand(batch_size, 1) - 1) * np.pi | |
self.unit_velocity: Tensor = torch.stack( | |
(torch.cos(self._angle), torch.sin(self._angle)), -1 | |
) | |
else: | |
self.is_fast_type: np.ndarray = np.random.binomial( | |
1, self.propotion_fast, [batch_size, 1, 1] | |
) | |
self.is_currently_fast: np.ndarray = self.is_fast_type.copy() | |
self.initial_position: np.ndarray = np.zeros([batch_size, 1, 2]) | |
self.position: np.ndarray = self.initial_position.copy() | |
self._angle: np.ndarray = np.random.uniform(-np.pi, np.pi, (batch_size, 1)) | |
self.unit_velocity: np.ndarray = np.stack( | |
(np.cos(self._angle), np.sin(self._angle)), -1 | |
) | |
def angle(self): | |
return self._angle | |
def angle(self, angle: Union[np.ndarray, torch.Tensor]): | |
assert self.batch_size == angle.shape[0] | |
if self.is_torch: | |
assert isinstance(angle, torch.Tensor) | |
self._angle = angle | |
self.unit_velocity = torch.stack( | |
(torch.cos(self._angle), torch.sin(self._angle)), -1 | |
) | |
else: | |
assert isinstance(angle, np.ndarray) | |
self._angle = angle | |
self.unit_velocity = np.stack( | |
(np.cos(self._angle), np.sin(self._angle)), -1 | |
) | |
def step(self) -> None: | |
""" | |
Forward one time step, update the speed selection and the current position. | |
""" | |
self.update_speed() | |
self.update_position() | |
def update_speed(self) -> None: | |
""" | |
Update the speed as a random selection between favored speed and the other speed with probability self.p_change_pace. | |
""" | |
if self.is_torch: | |
do_flip = ( | |
torch.from_numpy( | |
np.random.binomial(1, self.p_change_pace, self.batch_size).astype( | |
"float32" | |
) | |
) | |
== 1 | |
) | |
self.is_currently_fast = self.is_fast_type.clone() | |
else: | |
do_flip = np.random.binomial(1, self.p_change_pace, self.batch_size) == 1 | |
self.is_currently_fast = self.is_fast_type.copy() | |
self.is_currently_fast[do_flip] = 1 - self.is_fast_type[do_flip] | |
def update_position(self) -> None: | |
""" | |
Update the position as current position + time_step*speed*(cos(angle), sin(angle)) | |
""" | |
self.position += ( | |
self.dt | |
* ( | |
self.slow_speed | |
+ (self.fast_speed - self.slow_speed) * self.is_currently_fast | |
) | |
* self.unit_velocity | |
) | |
def travel_distance(self) -> Union[np.ndarray, Tensor]: | |
""" | |
Return the travel distance between initial position and current position. | |
""" | |
if self.is_torch: | |
return torch.sqrt( | |
torch.sum(torch.square(self.position - self.initial_position), -1) | |
) | |
else: | |
return np.sqrt(np.sum(np.square(self.position - self.initial_position), -1)) | |
def get_final_position(self, time: float) -> Union[np.ndarray, Tensor]: | |
""" | |
Return a sample of pedestrian final positions using their speed distribution. | |
(This is stochastic, different samples will produce different results). | |
Args: | |
time: The final time at which to get the position | |
Returns: | |
The batch of final positions | |
""" | |
num_steps = int(round(time / self.dt)) | |
if self.is_torch: | |
cumulative_change_state = torch.from_numpy( | |
np.random.binomial( | |
num_steps, self.p_change_pace, [self.batch_size, 1, 1] | |
).astype("float32") | |
) | |
else: | |
cumulative_change_state = np.random.binomial( | |
num_steps, self.p_change_pace, [self.batch_size, 1, 1] | |
) | |
num_fast_steps = ( | |
num_steps - 2 * cumulative_change_state | |
) * self.is_fast_type + cumulative_change_state | |
return self.position + self.unit_velocity * self.dt * ( | |
self.slow_speed * num_steps | |
+ (self.fast_speed - self.slow_speed) * num_fast_steps | |
) | |