File size: 6,392 Bytes
5769ee4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
import numpy as np
import torch
from torch import Tensor
from typing import Union


class RandomPedestrians:
    """
    Batched random pedestrians.
    There are two types of pedestrians, slow and fast ones.
    Each pedestrian type is walking mainly at its constant favored speed but at each time step there is a probability that it changes its pace.

    Args:
        batch_size: int number of scenes in the batch
        dt: float time step to use in the trajectory sequence
        fast_speed: float fast walking speed for the random pedestrian in meters/seconds
        slow_speed: float slow walking speed for the random pedestrian in meters/seconds
        p_change_pace: float probability that a slow (resp. fast) pedestrian walk at fast_speed (resp. slow_speed) at each time step
        proportion_fast: float proportion of the pedestrians that are mainly walking at fast_speed
        is_torch: bool set to True to produce Tensor batches and to False to produce numpy arrays
    """

    def __init__(
        self,
        batch_size: int,
        dt: float = 0.1,
        fast_speed: float = 2,
        slow_speed: float = 1,
        p_change_pace: float = 0.1,
        proportion_fast: float = 0.5,
        is_torch: bool = False,
    ) -> None:

        self.is_torch = is_torch
        self.fast_speed: float = fast_speed
        self.slow_speed: float = slow_speed
        self.dt: float = dt
        self.p_change_pace: float = p_change_pace
        self.batch_size: int = batch_size

        self.propotion_fast: float = proportion_fast
        if self.is_torch:
            self.is_fast_type: Tensor = torch.from_numpy(
                np.random.binomial(1, self.propotion_fast, [batch_size, 1, 1]).astype(
                    "float32"
                )
            )
            self.is_currently_fast: Tensor = self.is_fast_type.clone()
            self.initial_position: Tensor = torch.zeros([batch_size, 1, 2])
            self.position: Tensor = self.initial_position.clone()
            self._angle: Tensor = (2 * torch.rand(batch_size, 1) - 1) * np.pi
            self.unit_velocity: Tensor = torch.stack(
                (torch.cos(self._angle), torch.sin(self._angle)), -1
            )
        else:
            self.is_fast_type: np.ndarray = np.random.binomial(
                1, self.propotion_fast, [batch_size, 1, 1]
            )
            self.is_currently_fast: np.ndarray = self.is_fast_type.copy()
            self.initial_position: np.ndarray = np.zeros([batch_size, 1, 2])
            self.position: np.ndarray = self.initial_position.copy()
            self._angle: np.ndarray = np.random.uniform(-np.pi, np.pi, (batch_size, 1))
            self.unit_velocity: np.ndarray = np.stack(
                (np.cos(self._angle), np.sin(self._angle)), -1
            )

    @property
    def angle(self):
        return self._angle

    @angle.setter
    def angle(self, angle: Union[np.ndarray, torch.Tensor]):
        assert self.batch_size == angle.shape[0]
        if self.is_torch:
            assert isinstance(angle, torch.Tensor)
            self._angle = angle
            self.unit_velocity = torch.stack(
                (torch.cos(self._angle), torch.sin(self._angle)), -1
            )
        else:
            assert isinstance(angle, np.ndarray)
            self._angle = angle
            self.unit_velocity = np.stack(
                (np.cos(self._angle), np.sin(self._angle)), -1
            )

    def step(self) -> None:
        """
        Forward one time step, update the speed selection and the current position.
        """
        self.update_speed()
        self.update_position()

    def update_speed(self) -> None:
        """
        Update the speed as a random selection between favored speed and the other speed with probability self.p_change_pace.
        """
        if self.is_torch:
            do_flip = (
                torch.from_numpy(
                    np.random.binomial(1, self.p_change_pace, self.batch_size).astype(
                        "float32"
                    )
                )
                == 1
            )
            self.is_currently_fast = self.is_fast_type.clone()
        else:
            do_flip = np.random.binomial(1, self.p_change_pace, self.batch_size) == 1
            self.is_currently_fast = self.is_fast_type.copy()
        self.is_currently_fast[do_flip] = 1 - self.is_fast_type[do_flip]

    def update_position(self) -> None:
        """
        Update the position as current position + time_step*speed*(cos(angle), sin(angle))
        """
        self.position += (
            self.dt
            * (
                self.slow_speed
                + (self.fast_speed - self.slow_speed) * self.is_currently_fast
            )
            * self.unit_velocity
        )

    def travel_distance(self) -> Union[np.ndarray, Tensor]:
        """
        Return the travel distance between initial position and current position.
        """
        if self.is_torch:
            return torch.sqrt(
                torch.sum(torch.square(self.position - self.initial_position), -1)
            )
        else:
            return np.sqrt(np.sum(np.square(self.position - self.initial_position), -1))

    def get_final_position(self, time: float) -> Union[np.ndarray, Tensor]:
        """
        Return a sample of pedestrian final positions using their speed distribution.
        (This is stochastic, different samples will produce different results).
        Args:
            time: The final time at which to get the position
        Returns:
            The batch of final positions
        """
        num_steps = int(round(time / self.dt))
        if self.is_torch:
            cumulative_change_state = torch.from_numpy(
                np.random.binomial(
                    num_steps, self.p_change_pace, [self.batch_size, 1, 1]
                ).astype("float32")
            )
        else:
            cumulative_change_state = np.random.binomial(
                num_steps, self.p_change_pace, [self.batch_size, 1, 1]
            )

        num_fast_steps = (
            num_steps - 2 * cumulative_change_state
        ) * self.is_fast_type + cumulative_change_state

        return self.position + self.unit_velocity * self.dt * (
            self.slow_speed * num_steps
            + (self.fast_speed - self.slow_speed) * num_fast_steps
        )