jmercat's picture
Removed history to avoid any unverified information being released
5769ee4
raw
history blame
6.39 kB
import numpy as np
import torch
from torch import Tensor
from typing import Union
class RandomPedestrians:
"""
Batched random pedestrians.
There are two types of pedestrians, slow and fast ones.
Each pedestrian type is walking mainly at its constant favored speed but at each time step there is a probability that it changes its pace.
Args:
batch_size: int number of scenes in the batch
dt: float time step to use in the trajectory sequence
fast_speed: float fast walking speed for the random pedestrian in meters/seconds
slow_speed: float slow walking speed for the random pedestrian in meters/seconds
p_change_pace: float probability that a slow (resp. fast) pedestrian walk at fast_speed (resp. slow_speed) at each time step
proportion_fast: float proportion of the pedestrians that are mainly walking at fast_speed
is_torch: bool set to True to produce Tensor batches and to False to produce numpy arrays
"""
def __init__(
self,
batch_size: int,
dt: float = 0.1,
fast_speed: float = 2,
slow_speed: float = 1,
p_change_pace: float = 0.1,
proportion_fast: float = 0.5,
is_torch: bool = False,
) -> None:
self.is_torch = is_torch
self.fast_speed: float = fast_speed
self.slow_speed: float = slow_speed
self.dt: float = dt
self.p_change_pace: float = p_change_pace
self.batch_size: int = batch_size
self.propotion_fast: float = proportion_fast
if self.is_torch:
self.is_fast_type: Tensor = torch.from_numpy(
np.random.binomial(1, self.propotion_fast, [batch_size, 1, 1]).astype(
"float32"
)
)
self.is_currently_fast: Tensor = self.is_fast_type.clone()
self.initial_position: Tensor = torch.zeros([batch_size, 1, 2])
self.position: Tensor = self.initial_position.clone()
self._angle: Tensor = (2 * torch.rand(batch_size, 1) - 1) * np.pi
self.unit_velocity: Tensor = torch.stack(
(torch.cos(self._angle), torch.sin(self._angle)), -1
)
else:
self.is_fast_type: np.ndarray = np.random.binomial(
1, self.propotion_fast, [batch_size, 1, 1]
)
self.is_currently_fast: np.ndarray = self.is_fast_type.copy()
self.initial_position: np.ndarray = np.zeros([batch_size, 1, 2])
self.position: np.ndarray = self.initial_position.copy()
self._angle: np.ndarray = np.random.uniform(-np.pi, np.pi, (batch_size, 1))
self.unit_velocity: np.ndarray = np.stack(
(np.cos(self._angle), np.sin(self._angle)), -1
)
@property
def angle(self):
return self._angle
@angle.setter
def angle(self, angle: Union[np.ndarray, torch.Tensor]):
assert self.batch_size == angle.shape[0]
if self.is_torch:
assert isinstance(angle, torch.Tensor)
self._angle = angle
self.unit_velocity = torch.stack(
(torch.cos(self._angle), torch.sin(self._angle)), -1
)
else:
assert isinstance(angle, np.ndarray)
self._angle = angle
self.unit_velocity = np.stack(
(np.cos(self._angle), np.sin(self._angle)), -1
)
def step(self) -> None:
"""
Forward one time step, update the speed selection and the current position.
"""
self.update_speed()
self.update_position()
def update_speed(self) -> None:
"""
Update the speed as a random selection between favored speed and the other speed with probability self.p_change_pace.
"""
if self.is_torch:
do_flip = (
torch.from_numpy(
np.random.binomial(1, self.p_change_pace, self.batch_size).astype(
"float32"
)
)
== 1
)
self.is_currently_fast = self.is_fast_type.clone()
else:
do_flip = np.random.binomial(1, self.p_change_pace, self.batch_size) == 1
self.is_currently_fast = self.is_fast_type.copy()
self.is_currently_fast[do_flip] = 1 - self.is_fast_type[do_flip]
def update_position(self) -> None:
"""
Update the position as current position + time_step*speed*(cos(angle), sin(angle))
"""
self.position += (
self.dt
* (
self.slow_speed
+ (self.fast_speed - self.slow_speed) * self.is_currently_fast
)
* self.unit_velocity
)
def travel_distance(self) -> Union[np.ndarray, Tensor]:
"""
Return the travel distance between initial position and current position.
"""
if self.is_torch:
return torch.sqrt(
torch.sum(torch.square(self.position - self.initial_position), -1)
)
else:
return np.sqrt(np.sum(np.square(self.position - self.initial_position), -1))
def get_final_position(self, time: float) -> Union[np.ndarray, Tensor]:
"""
Return a sample of pedestrian final positions using their speed distribution.
(This is stochastic, different samples will produce different results).
Args:
time: The final time at which to get the position
Returns:
The batch of final positions
"""
num_steps = int(round(time / self.dt))
if self.is_torch:
cumulative_change_state = torch.from_numpy(
np.random.binomial(
num_steps, self.p_change_pace, [self.batch_size, 1, 1]
).astype("float32")
)
else:
cumulative_change_state = np.random.binomial(
num_steps, self.p_change_pace, [self.batch_size, 1, 1]
)
num_fast_steps = (
num_steps - 2 * cumulative_change_state
) * self.is_fast_type + cumulative_change_state
return self.position + self.unit_velocity * self.dt * (
self.slow_speed * num_steps
+ (self.fast_speed - self.slow_speed) * num_fast_steps
)