gomoku / DI-engine /ding /reward_model /guided_cost_reward_model.py
zjowowen's picture
init space
079c32c
raw
history blame
8.3 kB
from typing import List, Dict, Any
from easydict import EasyDict
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.distributions import Independent, Normal
from ding.utils import REWARD_MODEL_REGISTRY
from ding.utils.data import default_collate
from .base_reward_model import BaseRewardModel
class GuidedCostNN(nn.Module):
def __init__(
self,
input_size,
hidden_size=128,
output_size=1,
):
super(GuidedCostNN, self).__init__()
self.net = nn.Sequential(
nn.Linear(input_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, output_size),
)
def forward(self, x):
return self.net(x)
@REWARD_MODEL_REGISTRY.register('guided_cost')
class GuidedCostRewardModel(BaseRewardModel):
"""
Overview:
Policy class of Guided cost algorithm. (https://arxiv.org/pdf/1603.00448.pdf)
Interface:
``estimate``, ``train``, ``collect_data``, ``clear_date``, \
``__init__``, ``state_dict``, ``load_state_dict``, ``learn``\
``state_dict_reward_model``, ``load_state_dict_reward_model``
Config:
== ==================== ======== ============= ======================================== ================
ID Symbol Type Default Value Description Other(Shape)
== ==================== ======== ============= ======================================== ================
1 ``type`` str guided_cost | Reward model register name, refer |
| to registry ``REWARD_MODEL_REGISTRY`` |
2 | ``continuous`` bool True | Whether action is continuous |
3 | ``learning_rate`` float 0.001 | learning rate for optimizer |
4 | ``update_per_`` int 100 | Number of updates per collect |
| ``collect`` | |
5 | ``batch_size`` int 64 | Training batch size |
6 | ``hidden_size`` int 128 | Linear model hidden size |
7 | ``action_shape`` int 1 | Action space shape |
8 | ``log_every_n`` int 50 | add loss to log every n iteration |
| ``_train`` | |
9 | ``store_model_`` int 100 | save model every n iteration |
| ``every_n_train`` |
== ==================== ======== ============= ======================================== ================
"""
config = dict(
# (str) Reward model register name, refer to registry ``REWARD_MODEL_REGISTRY``.
type='guided_cost',
# (float) The step size of gradient descent.
learning_rate=1e-3,
# (int) Action space shape, such as 1.
action_shape=1,
# (bool) Whether action is continuous.
continuous=True,
# (int) How many samples in a training batch.
batch_size=64,
# (int) Linear model hidden size.
hidden_size=128,
# (int) How many updates(iterations) to train after collector's one collection.
# Bigger "update_per_collect" means bigger off-policy.
# collect data -> update policy-> collect data -> ...
update_per_collect=100,
# (int) Add loss to log every n iteration.
log_every_n_train=50,
# (int) Save model every n iteration.
store_model_every_n_train=100,
)
def __init__(self, config: EasyDict, device: str, tb_logger: 'SummaryWriter') -> None: # noqa
super(GuidedCostRewardModel, self).__init__()
self.cfg = config
self.action_shape = self.cfg.action_shape
assert device == "cpu" or device.startswith("cuda")
self.device = device
self.tb_logger = tb_logger
self.reward_model = GuidedCostNN(config.input_size, config.hidden_size)
self.reward_model.to(self.device)
self.opt = optim.Adam(self.reward_model.parameters(), lr=config.learning_rate)
def train(self, expert_demo: torch.Tensor, samp: torch.Tensor, iter, step):
device_0 = expert_demo[0]['obs'].device
device_1 = samp[0]['obs'].device
for i in range(len(expert_demo)):
expert_demo[i]['prob'] = torch.FloatTensor([1]).to(device_0)
if self.cfg.continuous:
for i in range(len(samp)):
(mu, sigma) = samp[i]['logit']
dist = Independent(Normal(mu, sigma), 1)
next_action = samp[i]['action']
log_prob = dist.log_prob(next_action)
samp[i]['prob'] = torch.exp(log_prob).unsqueeze(0).to(device_1)
else:
for i in range(len(samp)):
probs = F.softmax(samp[i]['logit'], dim=-1)
prob = probs[samp[i]['action']]
samp[i]['prob'] = prob.to(device_1)
# Mix the expert data and sample data to train the reward model.
samp.extend(expert_demo)
expert_demo = default_collate(expert_demo)
samp = default_collate(samp)
cost_demo = self.reward_model(
torch.cat([expert_demo['obs'], expert_demo['action'].float().reshape(-1, self.action_shape)], dim=-1)
)
cost_samp = self.reward_model(
torch.cat([samp['obs'], samp['action'].float().reshape(-1, self.action_shape)], dim=-1)
)
prob = samp['prob'].unsqueeze(-1)
loss_IOC = torch.mean(cost_demo) + \
torch.log(torch.mean(torch.exp(-cost_samp)/(prob+1e-7)))
# UPDATING THE COST FUNCTION
self.opt.zero_grad()
loss_IOC.backward()
self.opt.step()
if iter % self.cfg.log_every_n_train == 0:
self.tb_logger.add_scalar('reward_model/loss_iter', loss_IOC, iter)
self.tb_logger.add_scalar('reward_model/loss_step', loss_IOC, step)
def estimate(self, data: list) -> List[Dict]:
# NOTE: this estimate method of gcl alg. is a little different from the one in other irl alg.,
# because its deepcopy is operated before learner train loop.
train_data_augmented = data
for i in range(len(train_data_augmented)):
with torch.no_grad():
reward = self.reward_model(
torch.cat([train_data_augmented[i]['obs'], train_data_augmented[i]['action'].float()]).unsqueeze(0)
).squeeze(0)
train_data_augmented[i]['reward'] = -reward
return train_data_augmented
def collect_data(self, data) -> None:
"""
Overview:
Collecting training data, not implemented if reward model (i.e. online_net) is only trained ones, \
if online_net is trained continuously, there should be some implementations in collect_data method
"""
# if online_net is trained continuously, there should be some implementations in collect_data method
pass
def clear_data(self):
"""
Overview:
Collecting clearing data, not implemented if reward model (i.e. online_net) is only trained ones, \
if online_net is trained continuously, there should be some implementations in clear_data method
"""
# if online_net is trained continuously, there should be some implementations in clear_data method
pass
def state_dict_reward_model(self) -> Dict[str, Any]:
return {
'model': self.reward_model.state_dict(),
'optimizer': self.opt.state_dict(),
}
def load_state_dict_reward_model(self, state_dict: Dict[str, Any]) -> None:
self.reward_model.load_state_dict(state_dict['model'])
self.opt.load_state_dict(state_dict['optimizer'])