|
import numpy as np |
|
|
|
|
|
def discount_rewards(r, gamma=0.99, value_next=0.0): |
|
""" |
|
Computes discounted sum of future rewards for use in updating value estimate. |
|
:param r: List of rewards. |
|
:param gamma: Discount factor. |
|
:param value_next: T+1 value estimate for returns calculation. |
|
:return: discounted sum of future rewards as list. |
|
""" |
|
discounted_r = np.zeros_like(r) |
|
running_add = value_next |
|
for t in reversed(range(0, r.size)): |
|
running_add = running_add * gamma + r[t] |
|
discounted_r[t] = running_add |
|
return discounted_r |
|
|
|
|
|
def get_gae(rewards, value_estimates, value_next=0.0, gamma=0.99, lambd=0.95): |
|
""" |
|
Computes generalized advantage estimate for use in updating policy. |
|
:param rewards: list of rewards for time-steps t to T. |
|
:param value_next: Value estimate for time-step T+1. |
|
:param value_estimates: list of value estimates for time-steps t to T. |
|
:param gamma: Discount factor. |
|
:param lambd: GAE weighing factor. |
|
:return: list of advantage estimates for time-steps t to T. |
|
""" |
|
value_estimates = np.append(value_estimates, value_next) |
|
delta_t = rewards + gamma * value_estimates[1:] - value_estimates[:-1] |
|
advantage = discount_rewards(r=delta_t, gamma=gamma * lambd) |
|
return advantage |
|
|
|
|
|
def lambda_return(r, value_estimates, gamma=0.99, lambd=0.8, value_next=0.0): |
|
returns = np.zeros_like(r) |
|
returns[-1] = r[-1] + gamma * value_next |
|
for t in reversed(range(0, r.size - 1)): |
|
returns[t] = ( |
|
gamma * lambd * returns[t + 1] |
|
+ r[t] |
|
+ (1 - lambd) * gamma * value_estimates[t + 1] |
|
) |
|
return returns |
|
|