|
import torch |
|
from agent_class import ParameterisedPolicy |
|
|
|
def create_cum_rewards(rewards, discount=DISCOUNT): |
|
new_rews = [0] |
|
for el in rewards[::-1]: |
|
val = el + discount * new_rews[-1] |
|
new_rews.append(val) |
|
return torch.tensor(new_rews[1:][::-1], dtype=torch.float32) |
|
|
|
|
|
def play_game(env, model, n_steps=500, render=False): |
|
observation = env.reset() |
|
|
|
rewards, logits = [], [] |
|
|
|
while True: |
|
if render: |
|
env.render() |
|
|
|
(mus, sigmas) = model(torch.tensor(observation, dtype=torch.float32)) |
|
|
|
m = torch.distributions.normal.Normal(mus, sigmas) |
|
action = m.sample() |
|
logit = m.log_prob(action) |
|
observation, reward, done, info = env.step(action.detach().numpy()) |
|
|
|
rewards.append(reward) |
|
logits.append(m.log_prob(action).sum()) |
|
|
|
if done: |
|
break |
|
env.close() |
|
|
|
return rewards, logits |
|
|
|
def draw_gradients_rewards(model, rewards, ep_lengths, ave_over_steps): |
|
|
|
fig, axs = plt.subplot_mosaic([['1', '1', '2', '2'], ['3', '4', '5', '6']], |
|
constrained_layout=False, figsize=(20, 9)) |
|
|
|
axs['1'].plot(np.array(rewards[:ave_over_steps*(len(rewards)//ave_over_steps)])\ |
|
.reshape(-1, ave_over_steps).mean(axis=-1)) |
|
axs['1'].set_title('Sum rewards per episode') |
|
|
|
axs['1'].hlines(200, 0, len(rewards)/ave_over_steps, colors='red') |
|
axs['1'].hlines(150, 0, len(rewards)/ave_over_steps, colors='orange') |
|
axs['1'].hlines(0, 0, len(rewards)/ave_over_steps, colors='green') |
|
|
|
axs['2'].plot(np.array(ep_lengths[:ave_over_steps*(len(ep_lengths)//ave_over_steps)])\ |
|
.reshape(-1, ave_over_steps).mean(axis=-1)) |
|
axs['2'].set_title('Episode length') |
|
|
|
axs['3'].hist(model.lin_1.weight.grad.flatten().detach().numpy(), bins=50); |
|
axs['3'].set_xlabel('Grads in dense layer 1') |
|
|
|
axs['4'].hist(model.lin_2.weight.grad.flatten().detach().numpy(), bins=50); |
|
axs['4'].set_xlabel('Grads in dense layer 2') |
|
|
|
axs['5'].hist(model.lin_3.weight.grad.flatten().detach().numpy(), bins=50); |
|
axs['5'].set_xlabel('Grads in dense layer 3') |
|
|
|
axs['6'].hist(model.lin_4.weight.grad.flatten().detach().numpy(), bins=50); |
|
axs['6'].set_xlabel('Grads in dense layer 4') |
|
|
|
model = ParameterisedPolicy() |
|
opt = torch.optim.Adam(model.parameters(), lr=0.0008) |
|
lr_scheduler = torch.optim.lr_scheduler.StepLR(opt, step_size=4000, gamma=0.7) |
|
rews, ep_lengths = [], [] |
|
|
|
last_max_score = 50 |
|
env = gym.make(env_name) |
|
|
|
for _ in range(int(10e3)): |
|
rewards, logits = play_game(env, model, render=False) |
|
|
|
cum_rewards = create_cum_rewards(rewards, discount=DISCOUNT) |
|
stacked_logits = torch.stack(logits).flatten() |
|
|
|
loss = -(stacked_logits * cum_rewards).mean() |
|
|
|
rews.append(np.sum(rewards)) |
|
ep_lengths.append(len(rewards)) |
|
|
|
opt.zero_grad() |
|
loss.backward() |
|
torch.nn.utils.clip_grad_norm_(model.parameters(), 50) |
|
opt.step() |
|
lr_scheduler.step() |
|
|
|
if _%40 == 0: |
|
if _ > 1: |
|
clear_output() |
|
draw_gradients_rewards(model, rewards=rews, |
|
ep_lengths=ep_lengths, ave_over_steps=40) |
|
plt.show() |
|
|
|
if len(rews) > 40: |
|
agg_rews = np.array(rews[-40*(len(rews)//40):])\ |
|
.reshape(-1, 40).mean(axis=-1) |
|
if (agg_rews[-1] > last_max_score): |
|
last_max_score = agg_rews[-1] |
|
print('NEW BEST MODEL, STEP:', _, 'SCORE: ', last_max_score) |
|
save_path = f'best_models/best_reinforce_lunar_lander_cont_model_{round(last_max_score,3)}.pt' |
|
torch.save(model, save_path) |
|
|