File size: 2,461 Bytes
b18ddcc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import numpy as np

from dataclasses import dataclass, field
from typing import Generic, List, Optional, Type, TypeVar

from rl_algo_impls.wrappers.vectorable_wrapper import VecEnvObs


@dataclass
class Trajectory:
    obs: List[np.ndarray] = field(default_factory=list)
    act: List[np.ndarray] = field(default_factory=list)
    next_obs: Optional[np.ndarray] = None
    rew: List[float] = field(default_factory=list)
    terminated: bool = False
    v: List[float] = field(default_factory=list)

    def add(
        self,
        obs: np.ndarray,
        act: np.ndarray,
        next_obs: np.ndarray,
        rew: float,
        terminated: bool,
        v: float,
    ):
        self.obs.append(obs)
        self.act.append(act)
        self.next_obs = next_obs if not terminated else None
        self.rew.append(rew)
        self.terminated = terminated
        self.v.append(v)

    def __len__(self) -> int:
        return len(self.obs)


T = TypeVar("T", bound=Trajectory)


class TrajectoryAccumulator(Generic[T]):
    def __init__(self, num_envs: int, trajectory_class: Type[T] = Trajectory) -> None:
        self.num_envs = num_envs
        self.trajectory_class = trajectory_class

        self._trajectories = []
        self._current_trajectories = [trajectory_class() for _ in range(num_envs)]

    def step(
        self,
        obs: VecEnvObs,
        action: np.ndarray,
        next_obs: VecEnvObs,
        reward: np.ndarray,
        done: np.ndarray,
        val: np.ndarray,
        *args,
    ) -> None:
        assert isinstance(obs, np.ndarray)
        assert isinstance(next_obs, np.ndarray)
        for i, args in enumerate(zip(obs, action, next_obs, reward, done, val, *args)):
            trajectory = self._current_trajectories[i]
            # TODO: Eventually take advantage of terminated/truncated differentiation in
            # later versions of gym.
            trajectory.add(*args)
            if done[i]:
                self._trajectories.append(trajectory)
                self._current_trajectories[i] = self.trajectory_class()
                self.on_done(i, trajectory)

    @property
    def all_trajectories(self) -> List[T]:
        return self._trajectories + list(
            filter(lambda t: len(t), self._current_trajectories)
        )

    def n_timesteps(self) -> int:
        return sum(len(t) for t in self.all_trajectories)

    def on_done(self, env_idx: int, trajectory: T) -> None:
        pass