import os |
import sys |
import random |
import json |
import copy |
import enum |
from functools import partial |
import numpy as np |
import torch |
import torch.nn as nn |
import torch.nn.functional as F |
from dizoo.gfootball.model.bots.TamakEriFever.handyrl_core.model import BaseModel, Dense |
from dizoo.gfootball.model.bots.TamakEriFever.football.util import * |
class MultiHeadAttention(nn.Module): |
def __init__(self, in_dim, out_dim, out_heads, relation_dim=0, residual=False, projection=True, layer_norm=True): |
super().__init__() |
self.in_dim = in_dim |
self.out_dim = out_dim |
self.out_heads = out_heads |
self.relation_dim = relation_dim |
assert self.out_dim % self.out_heads == 0 |
self.query_layer = nn.Linear(self.in_dim + self.relation_dim, self.out_dim, bias=False) |
self.key_layer = nn.Linear(self.in_dim + self.relation_dim, self.out_dim, bias=False) |
self.value_layer = nn.Linear(self.in_dim, self.out_dim, bias=False) |
self.residual = residual |
self.projection = projection |
if self.projection: |
self.proj_layer = nn.Linear(self.out_dim, self.out_dim) |
self.layer_norm = layer_norm |
if self.layer_norm: |
self.ln = nn.LayerNorm(self.out_dim) |
self.reset_parameters() |
def reset_parameters(self): |
nn.init.uniform_(self.query_layer.weight, -0.1, 0.1) |
nn.init.uniform_(self.key_layer.weight, -0.1, 0.1) |
nn.init.uniform_(self.value_layer.weight, -0.1, 0.1) |
if self.projection: |
nn.init.uniform_(self.proj_layer.weight, -0.1, 0.1) |
def forward(self, query, key, relation=None, mask=None, key_mask=None, distance=None): |
""" |
Args: |
query (torch.Tensor): [batch, query_len, in_dim] |
key (torch.Tensor): [batch, key_len, in_dim] |
relation (torch.Tensor): [batch, query_len, key_len, relation_dim] |
mask (torch.Tensor): [batch, query_len] |
key_mask (torch.Tensor): [batch, key_len] |
Returns: |
torch.Tensor: [batch, query_len, out_dim] |
""" |
query_len = query.size(-2) |
key_len = key.size(-2) |
head_dim = self.out_dim // self.out_heads |
if key_mask is None: |
if torch.equal(query, key): |
key_mask = mask |
if relation is not None: |
relation = relation.view(-1, query_len, key_len, self.relation_dim) |
query_ = query.view(-1, query_len, 1, self.in_dim).repeat(1, 1, key_len, 1) |
query_ = torch.cat([query_, relation], dim=-1) |
key_ = key.view(-1, 1, key_len, self.in_dim).repeat(1, query_len, 1, 1) |
key_ = torch.cat([key_, relation], dim=-1) |
Q = self.query_layer(query_).view(-1, query_len * key_len, self.out_heads, head_dim) |
K = self.key_layer(key_).view(-1, query_len * key_len, self.out_heads, head_dim) |
Q = Q.transpose(1, 2).contiguous().view(-1, query_len, key_len, head_dim) |
K = K.transpose(1, 2).contiguous().view(-1, query_len, key_len, head_dim) |
attention = (Q * K).sum(dim=-1) |
else: |
Q = self.query_layer(query).view(-1, query_len, self.out_heads, head_dim) |
K = self.key_layer(key).view(-1, key_len, self.out_heads, head_dim) |
Q = Q.transpose(1, 2).contiguous().view(-1, query_len, head_dim) |
K = K.transpose(1, 2).contiguous().view(-1, key_len, head_dim) |
attention = torch.bmm(Q, K.transpose(1, 2)) |
if distance is not None: |
attention = attention - torch.log1p(distance.repeat(self.out_heads, 1, 1)) |
attention = attention * (float(head_dim) ** -0.5) |
if key_mask is not None: |
attention = attention.view(-1, self.out_heads, query_len, key_len) |
attention = attention + ((1 - key_mask) * -1e32).view(-1, 1, 1, key_len) |
attention = F.softmax(attention, dim=-1) |
if mask is not None: |
attention = attention * mask.view(-1, 1, query_len, 1) |
attention = attention.contiguous().view(-1, query_len, key_len) |
V = self.value_layer(key).view(-1, key_len, self.out_heads, head_dim) |
V = V.transpose(1, 2).contiguous().view(-1, key_len, head_dim) |
output = torch.bmm(attention, V).view(-1, self.out_heads, query_len, head_dim) |
output = output.transpose(1, 2).contiguous().view(*query.size()[:-2], query_len, self.out_dim) |
if self.projection: |
output = self.proj_layer(output) |
if self.residual: |
output = output + query |
if self.layer_norm: |
output = self.ln(output) |
if mask is not None: |
output = output * mask.unsqueeze(-1) |
attention = attention.view(*query.size()[:-2], self.out_heads, query_len, key_len).detach() |
return output, attention |
class ResidualBlock(nn.Module): |
def __init__(self, in_channels, out_channels, activation='relu'): |
super().__init__() |
self.in_channels, self.out_channels, self.activation = in_channels, out_channels, activation |
self.blocks = nn.Identity() |
self.activate = nn.ReLU() |
self.shortcut = nn.Identity() |
def forward(self, x): |
residual = x |
if self.should_apply_shortcut: |
residual = self.shortcut(x) |
x = self.blocks(x) |
x += residual |
x = self.activate(x) |
return x |
@property |
def should_apply_shortcut(self): |
return self.in_channels != self.out_channels |
class Conv2dAuto(nn.Conv2d): |
def __init__(self, *args, **kwargs): |
super().__init__(*args, **kwargs) |
self.padding = ( |
self.kernel_size[0] // 2, self.kernel_size[1] // 2 |
) |
class ResNetResidualBlock(ResidualBlock): |
def __init__(self, in_channels, out_channels, expansion=1, downsampling=1, *args, **kwargs): |
super().__init__(in_channels, out_channels, *args, **kwargs) |
self.expansion, self.downsampling, self.conv = expansion, downsampling, partial( |
Conv2dAuto, kernel_size=3, bias=False |
) |
self.shortcut = nn.Sequential( |
nn.Conv2d(self.in_channels, self.expanded_channels, kernel_size=1, stride=self.downsampling, bias=False), |
nn.BatchNorm2d(self.expanded_channels) |
) if self.should_apply_shortcut else None |
@property |
def expanded_channels(self): |
return self.out_channels * self.expansion |
@property |
def should_apply_shortcut(self): |
return self.in_channels != self.expanded_channels |
def activation_func(activation): |
return nn.ModuleDict( |
[ |
['relu', nn.ReLU(inplace=True)], ['leaky_relu', |
nn.LeakyReLU(negative_slope=0.01, inplace=True)], |
['selu', nn.SELU(inplace=True)], ['none', nn.Identity()] |
] |
)[activation] |
def conv_bn(in_channels, out_channels, conv, *args, **kwargs): |
conv3x3 = partial(Conv2dAuto, kernel_size=3, bias=False) |
return nn.Sequential(conv3x3(in_channels, out_channels, *args, **kwargs), nn.BatchNorm2d(out_channels)) |
class ResNetBasicBlock(ResNetResidualBlock): |
""" |
Basic ResNet block composed by two layers of 3x3conv/batchnorm/activation |
""" |
expansion = 1 |
def __init__(self, in_channels, out_channels, *args, **kwargs): |
super().__init__(in_channels, out_channels, *args, **kwargs) |
self.blocks = nn.Sequential( |
conv_bn(self.in_channels, self.out_channels, conv=self.conv, bias=False, stride=self.downsampling), |
activation_func(self.activation), |
conv_bn(self.out_channels, self.expanded_channels, conv=self.conv, bias=False), |
) |
class FootballNet(BaseModel): |
class FootballEncoder(nn.Module): |
def __init__(self, filters): |
super().__init__() |
self.player_embedding = nn.Embedding(32, 5, padding_idx=0) |
self.mode_embedding = nn.Embedding(8, 3, padding_idx=0) |
self.fc_teammate = nn.Linear(23, filters) |
self.fc_opponent = nn.Linear(23, filters) |
self.fc = nn.Linear(filters + 41, filters) |
def forward(self, x): |
bs = x['mode_index'].size(0) |
m_emb = self.mode_embedding(x['mode_index']).view(bs, -1) |
ball = x['ball'] |
s = torch.cat([ball, x['match'], x['distance']['b2o'].view(bs, -1), m_emb], dim=1) |
p_emb_self = self.player_embedding(x['player_index']['self']) |
ball_concat_self = ball.view(bs, 1, -1).repeat(1, x['player']['self'].size(1), 1) |
p_self = torch.cat([x['player']['self'], p_emb_self, ball_concat_self], dim=2) |
p_emb_opp = self.player_embedding(x['player_index']['opp']) |
ball_concat_opp = ball.view(bs, 1, -1).repeat(1, x['player']['opp'].size(1), 1) |
p_opp = torch.cat([x['player']['opp'], p_emb_opp, ball_concat_opp], dim=2) |
p_self = self.fc_teammate(p_self) |
p_opp = self.fc_opponent(p_opp) |
p = F.relu(torch.cat([p_self, p_opp], dim=1)) |
s_concat = s.view(bs, 1, -1).repeat(1, p.size(1), 1) |
""" |
TODO(pu): How to deal with dimension mismatch better? |
original code is: |
p = torch.cat([p, x['distance']['p2bo'].view(bs, p.size(1), -1), s_concat], dim=2) |
""" |
p = torch.cat([p, x['distance']['p2bo'].repeat(1, 2, 1).view(bs, p.size(1), -1), s_concat], dim=2) |
h = F.relu(self.fc(p)) |
rel = None |
distance = None |
return h, rel, distance |
class FootballBlock(nn.Module): |
def __init__(self, filters, heads): |
super().__init__() |
self.attention = MultiHeadAttention(filters, filters, heads, relation_dim=0, residual=True, projection=True) |
def forward(self, x, rel, distance=None): |
h, _ = self.attention(x, x, relation=rel, distance=distance) |
return h |
class FootballControll(nn.Module): |
def __init__(self, filters, final_filters): |
super().__init__() |
self.filters = filters |
self.attention = MultiHeadAttention(filters, filters, 1, residual=False, projection=True) |
self.fc_control = Dense(filters * 3, final_filters, bnunits=final_filters) |
def forward(self, x, e, control_flag): |
x_controled = (x * control_flag).sum(dim=1, keepdim=True) |
e_controled = (e * control_flag).sum(dim=1, keepdim=True) |
h, _ = self.attention(x_controled, x) |
h = torch.cat([x_controled, e_controled, h], dim=2).view(x.size(0), -1) |
h = self.fc_control(h) |
return h |
class FootballHead(nn.Module): |
def __init__(self, filters): |
super().__init__() |
self.head_p = nn.Linear(filters, 19, bias=False) |
self.head_p_special = nn.Linear(filters, 1 + 8 * 4, bias=False) |
self.head_v = nn.Linear(filters, 1, bias=True) |
self.head_r = nn.Linear(filters, 1, bias=False) |
def forward(self, x): |
p = self.head_p(x) |
p2 = self.head_p_special(x) |
v = self.head_v(x) |
r = self.head_r(x) |
return torch.cat([p, p2], -1), v, r |
class CNNModel(nn.Module): |
def __init__(self, final_filters): |
super().__init__() |
self.conv1 = nn.Sequential( |
nn.Conv2d(53, 128, kernel_size=1, stride=1, bias=False), nn.ReLU(inplace=True), |
nn.Conv2d(128, 160, kernel_size=1, stride=1, bias=False), nn.ReLU(inplace=True), |
nn.Conv2d(160, 128, kernel_size=1, stride=1, bias=False), nn.ReLU(inplace=True) |
) |
self.pool1 = nn.AdaptiveAvgPool2d((1, 11)) |
self.conv2 = nn.Sequential( |
nn.BatchNorm2d(128), |
nn.Conv2d(128, 160, kernel_size=(1, 1), stride=1, bias=False), |
nn.ReLU(inplace=True), |
nn.BatchNorm2d(160), |
nn.Conv2d(160, 96, kernel_size=(1, 1), stride=1, bias=False), |
nn.ReLU(inplace=True), |
nn.BatchNorm2d(96), |
nn.Conv2d(96, final_filters, kernel_size=(1, 1), stride=1, bias=False), |
nn.ReLU(inplace=True), |
nn.BatchNorm2d(final_filters), |
) |
self.pool2 = nn.AdaptiveAvgPool2d((1, 1)) |
self.flatten = nn.Flatten() |
def forward(self, x): |
x = x['cnn_feature'] |
x = self.conv1(x) |
x = self.pool1(x) |
x = self.conv2(x) |
x = self.pool2(x) |
x = self.flatten(x) |
return x |
class SMMEncoder(nn.Module): |
class SMMBlock(nn.Module): |
def __init__(self, in_filters, out_filters, residuals=2): |
super().__init__() |
self.conv1 = nn.Conv2d(in_filters, out_filters, kernel_size=3, stride=1, bias=False) |
self.pool1 = nn.MaxPool2d(3, stride=2) |
self.blocks = nn.ModuleList([ResNetBasicBlock(out_filters, out_filters) for _ in range(residuals)]) |
def forward(self, x): |
h = self.conv1(x) |
h = self.pool1(h) |
for block in self.blocks: |
h = block(h) |
return h |
def __init__(self, filters): |
super().__init__() |
self.blocks = nn.ModuleList( |
[ |
self.SMMBlock(4, filters), |
self.SMMBlock(filters, filters), |
self.SMMBlock(filters, filters), |
self.SMMBlock(filters, filters), |
] |
) |
def forward(self, x): |
x = x['smm'] |
h = x |
for block in self.blocks: |
h = block(h) |
h = F.relu(h) |
return h |
class ActionHistoryEncoder(nn.Module): |
def __init__(self, input_size=19, hidden_size=64, num_layers=2, bidirectional=True): |
super().__init__() |
self.action_emd = nn.Embedding(19, 8) |
self.rnn = nn.GRU(8, hidden_size, num_layers, batch_first=True, bidirectional=bidirectional) |
def forward(self, x): |
h = self.action_emd(x['action_history']) |
h = h.squeeze(dim=2) |
self.rnn.flatten_parameters() |
h, _ = self.rnn(h) |
return h |
def __init__(self, env, args={}, action_length=None): |
super().__init__(env, args, action_length) |
blocks = 5 |
filters = 96 |
final_filters = 128 |
smm_filters = 32 |
self.encoder = self.FootballEncoder(filters) |
self.blocks = nn.ModuleList([self.FootballBlock(filters, 8) for _ in range(blocks)]) |
self.control = self.FootballControll(filters, final_filters) |
self.cnn = self.CNNModel(final_filters) |
rnn_hidden = 64 |
self.rnn = self.ActionHistoryEncoder(19, rnn_hidden, 2) |
self.head = self.FootballHead(final_filters + final_filters + rnn_hidden * 2) |
def init_hidden(self, batch_size=None): |
return None |
def forward(self, x, hidden): |
e, rel, distance = self.encoder(x) |
h = e |
for block in self.blocks: |
h = block(h, rel, distance) |
cnn_h = self.cnn(x) |
h = self.control(h, e, x['control_flag']) |
rnn_h = self.rnn(x) |
rnn_h_head_tail = rnn_h[:, 0, :] + rnn_h[:, -1, :] |
rnn_h_plus_stick = torch.cat([rnn_h_head_tail[:, :-4], x['control']], dim=1) |
p, v, r = self.head(torch.cat([ |
h, |
cnn_h.view(cnn_h.size(0), -1), |
rnn_h_plus_stick, |
], axis=-1)) |
return p, torch.tanh(v), torch.tanh(r), hidden |
"controlled_players": 1, |
"players_raw": [ |
{ |
"right_team_active": [True, True, True, True, True, True, True, True, True, True, True], |
"right_team_yellow_card": [False, False, False, False, False, False, False, False, False, False, False], |
"left_team_tired_factor": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], |
"right_team_roles": [0, 2, 1, 1, 3, 5, 5, 5, 6, 9, 7], |
"left_team": [ |
[-1.0110293626785278, -0.0], [-0.4266543984413147, -0.19894461333751678], |
[-0.5055146813392639, -0.06459399312734604], [-0.5055146813392639, 0.06459297984838486], |
[-0.4266543984413147, 0.19894461333751678], [-0.18624374270439148, -0.10739918798208237], |
[-0.270525187253952, -0.0], [-0.18624374270439148, 0.10739918798208237], |
[-0.010110294446349144, -0.21961550414562225], [-0.05055147036910057, -0.0], |
[-0.010110294446349144, 0.21961753070354462] |
], |
"ball": [0.0, -0.0, 0.11061639338731766], |
"ball_owned_team": -1, |
"right_team_direction": [ |
[-0.0, 0.0], [-0.0, 0.0], [-0.0, 0.0], [-0.0, 0.0], [-0.0, 0.0], [-0.0, 0.0], [-0.0, 0.0], [-0.0, 0.0], |
[-0.0, 0.0], [-0.0, 0.0], [-0.0, 0.0] |
], |
"left_team_direction": [ |
[0.0, -0.0], [0.0, -0.0], [0.0, -0.0], [0.0, -0.0], [0.0, -0.0], [0.0, -0.0], [0.0, -0.0], [0.0, -0.0], |
[0.0, -0.0], [0.0, -0.0], [0.0, -0.0] |
], |
"left_team_roles": [0, 2, 1, 1, 3, 5, 5, 5, 6, 9, 7], |
"score": [0, 0], |
"left_team_active": [True, True, True, True, True, True, True, True, True, True, True], |
"game_mode": 0, |
"steps_left": 3001, |
"ball_direction": [-0.0, 0.0, 0.006163952872157097], |
"ball_owned_player": -1, |
"right_team": [ |
[1.0110293626785278, 0.0], [0.4266543984413147, 0.19894461333751678], |
[0.5055146813392639, 0.06459399312734604], [0.5055146813392639, -0.06459297984838486], |
[0.4266543984413147, -0.19894461333751678], [0.18624374270439148, 0.10739918798208237], |
[0.270525187253952, 0.0], [0.18624374270439148, -0.10739918798208237], |
[0.010110294446349144, 0.21961550414562225], [-0.0, -0.02032535709440708], [-0.0, 0.02032535709440708] |
], |
"left_team_yellow_card": [False, False, False, False, False, False, False, False, False, False, False], |
"ball_rotation": [0.0, -0.0, 0.0], |
"right_team_tired_factor": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], |
"designated": 6, |
"active": 6, |
"sticky_actions": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0] |
} |
] |
} |
INFO_TEMPLATE = {'half_step': 1500} |
def feature_from_states(states, info, player): |
obs_history_ = [s[player]['observation']['players_raw'][0] for s in reversed(states[-HISTORY_LENGTH:])] |
obs_history = obs_history_ + [obs_history_[-1]] * (HISTORY_LENGTH - len(obs_history_)) |
obs = obs_history[0] |
action_history_ = [s[player]['action'][0] for s in reversed(states[-HISTORY_LENGTH:])] |
action_history = action_history_ + [0] * (HISTORY_LENGTH - len(action_history_)) |
""" |
・left players (x) |
・left players (y) |
・right players (x) |
・right players (y) |
・ball (x) |
・ball (y) |
・left goal (x) |
・left goal (y) |
・right goal (x) |
・right goal (y) |
・active (x) |
・active (y) |
・left players (x) - right players (x) |
・left players (y) - right players (y) |
・left players (x) - ball (x) |
・left players (y) - ball (y) |
・left players (x) - goal (x) |
・left players (y) - goal (y) |
・left players (x) - active (x) |
・left players (y) - active (y) |
・left players direction (x) |
・left players direction (y) |
・right players direction (x) |
・right players direction (y) |
・left players direction (x) - right players direction (x) |
・left players direction (y) - right players direction (y) |
""" |
obs_left_team = np.array(obs['left_team']) |
left_player_x = np.repeat(obs_left_team[:, 0][..., None], 11, axis=1) |
left_player_y = np.repeat(obs_left_team[:, 1][..., None], 11, axis=1) |
obs_right_team = np.array(obs['right_team']) |
right_player_x = np.repeat(obs_right_team[:, 0][..., None], 11, axis=1).transpose(1, 0) |
right_player_y = np.repeat(obs_right_team[:, 1][..., None], 11, axis=1).transpose(1, 0) |
obs_ball = np.array(obs['ball']) |
ball_x = np.ones((11, 11)) * obs_ball[0] |
ball_y = np.ones((11, 11)) * obs_ball[1] |
ball_z = np.ones((11, 11)) * obs_ball[2] |
left_goal, right_goal = [-1, 0], [1, 0] |
left_goal_x = np.ones((11, 11)) * left_goal[0] |
left_goal_y = np.ones((11, 11)) * left_goal[1] |
right_goal_x = np.ones((11, 11)) * right_goal[0] |
right_goal_y = np.ones((11, 11)) * right_goal[1] |
side_line_y = [-.42, .42] |
side_line_y_top = np.ones((11, 11)) * side_line_y[0] |
side_line_y_bottom = np.ones((11, 11)) * side_line_y[1] |
active = np.array(obs['active']) |
active_player_x = np.repeat(obs_left_team[active][0][..., None, None], 11, axis=1).repeat(11, axis=0) |
active_player_y = np.repeat(obs_left_team[active][1][..., None, None], 11, axis=1).repeat(11, axis=0) |
left_minus_right_player_x = obs_left_team[:, 0][..., None] - obs_right_team[:, 0] |
left_minus_right_player_y = obs_left_team[:, 1][..., None] - obs_right_team[:, 1] |
left_minus_ball_x = (obs_left_team[:, 0][..., None] - obs_ball[0]).repeat(11, axis=1) |
left_minus_ball_y = (obs_left_team[:, 1][..., None] - obs_ball[1]).repeat(11, axis=1) |
left_minus_right_goal_x = (obs_left_team[:, 0][..., None] - right_goal[0]).repeat(11, axis=1) |
left_minus_right_goal_y = (obs_left_team[:, 1][..., None] - right_goal[1]).repeat(11, axis=1) |
left_minus_left_goal_x = (obs_left_team[:, 0][..., None] - left_goal[0]).repeat(11, axis=1) |
left_minus_left_goal_y = (obs_left_team[:, 1][..., None] - left_goal[1]).repeat(11, axis=1) |
right_minus_right_goal_x = (obs_right_team[:, 0][..., None] - right_goal[0]).repeat(11, axis=1).transpose(1, 0) |
right_minus_right_goal_y = (obs_right_team[:, 1][..., None] - right_goal[1]).repeat(11, axis=1).transpose(1, 0) |
right_minus_left_goal_x = (obs_right_team[:, 0][..., None] - left_goal[0]).repeat(11, axis=1).transpose(1, 0) |
right_minus_left_goal_y = (obs_right_team[:, 1][..., None] - left_goal[1]).repeat(11, axis=1).transpose(1, 0) |
left_minus_active_x = (obs_left_team[:, 0][..., None] - obs_left_team[active][0]).repeat(11, axis=1) |
left_minus_active_y = (obs_left_team[:, 1][..., None] - obs_left_team[active][1]).repeat(11, axis=1) |
right_minus_ball_x = (obs_right_team[:, 0][..., None] - obs_ball[0]).repeat(11, axis=1).transpose(1, 0) |
right_minus_ball_y = (obs_right_team[:, 1][..., None] - obs_ball[1]).repeat(11, axis=1).transpose(1, 0) |
right_minus_active_x = (obs_right_team[:, 0][..., None] - obs_left_team[active][0]).repeat( |
11, axis=1 |
).transpose(1, 0) |
right_minus_active_y = (obs_right_team[:, 1][..., None] - obs_left_team[active][1]).repeat( |
11, axis=1 |
).transpose(1, 0) |
left_minus_side_top = np.abs(obs_left_team[:, 1][..., None] - side_line_y[0]).repeat(11, axis=1) |
left_minus_side_bottom = np.abs(obs_left_team[:, 1][..., None] - side_line_y[1]).repeat(11, axis=1) |
right_minus_side_top = np.abs(obs_right_team[:, 1][..., None] - side_line_y[0]).repeat(11, axis=1).transpose(1, 0) |
right_minus_side_bottom = np.abs(obs_right_team[:, 1][..., None] - side_line_y[1]).repeat( |
11, axis=1 |
).transpose(1, 0) |
obs_left_team_direction = np.array(obs['left_team_direction']) |
left_player_direction_x = np.repeat(obs_left_team_direction[:, 0][..., None], 11, axis=1) |
left_player_direction_y = np.repeat(obs_left_team_direction[:, 1][..., None], 11, axis=1) |
obs_right_team_direction = np.array(obs['right_team_direction']) |
right_player_direction_x = np.repeat(obs_right_team_direction[:, 0][..., None], 11, axis=1).transpose(1, 0) |
right_player_direction_y = np.repeat(obs_right_team_direction[:, 1][..., None], 11, axis=1).transpose(1, 0) |
obs_ball_direction = np.array(obs['ball_direction']) |
ball_direction_x = np.ones((11, 11)) * obs_ball_direction[0] |
ball_direction_y = np.ones((11, 11)) * obs_ball_direction[1] |
ball_direction_z = np.ones((11, 11)) * obs_ball_direction[2] |
left_minus_right_player_direction_x = obs_left_team_direction[:, 0][..., None] - obs_right_team_direction[:, 0] |
left_minus_right_player_direction_y = obs_left_team_direction[:, 1][..., None] - obs_right_team_direction[:, 1] |
left_minus_ball_direction_x = (obs_left_team_direction[:, 0][..., None] - obs_ball_direction[0]).repeat(11, axis=1) |
left_minus_ball_direction_y = (obs_left_team_direction[:, 1][..., None] - obs_ball_direction[1]).repeat(11, axis=1) |
right_minus_ball_direction_x = (obs_right_team_direction[:, 0][..., None] - obs_ball_direction[0]).repeat( |
11, axis=1 |
).transpose(1, 0) |
right_minus_ball_direction_y = (obs_right_team_direction[:, 1][..., None] - obs_ball_direction[1]).repeat( |
11, axis=1 |
).transpose(1, 0) |
obs_ball_rotation = np.array(obs['ball_rotation']) |
ball_rotation_x = np.ones((11, 11)) * obs_ball_rotation[0] |
ball_rotation_y = np.ones((11, 11)) * obs_ball_rotation[1] |
ball_rotation_z = np.ones((11, 11)) * obs_ball_rotation[2] |
cnn_feature = np.stack( |
[ |
left_player_x, |
left_player_y, |
right_player_x, |
right_player_y, |
ball_x, |
ball_y, |
ball_z, |
left_goal_x, |
left_goal_y, |
right_goal_x, |
right_goal_y, |
side_line_y_top, |
side_line_y_bottom, |
active_player_x, |
active_player_y, |
left_minus_right_player_x, |
left_minus_right_player_y, |
left_minus_right_goal_x, |
left_minus_right_goal_y, |
left_minus_left_goal_x, |
left_minus_left_goal_y, |
right_minus_right_goal_x, |
right_minus_right_goal_y, |
right_minus_left_goal_x, |
right_minus_left_goal_y, |
left_minus_side_top, |
left_minus_side_bottom, |
right_minus_side_top, |
right_minus_side_bottom, |
right_minus_ball_x, |
right_minus_ball_y, |
right_minus_active_x, |
right_minus_active_y, |
left_minus_ball_x, |
left_minus_ball_y, |
left_minus_active_x, |
left_minus_active_y, |
ball_direction_x, |
ball_direction_y, |
ball_direction_z, |
left_minus_ball_direction_x, |
left_minus_ball_direction_y, |
right_minus_ball_direction_x, |
right_minus_ball_direction_y, |
left_player_direction_x, |
left_player_direction_y, |
right_player_direction_x, |
right_player_direction_y, |
left_minus_right_player_direction_x, |
left_minus_right_player_direction_y, |
ball_rotation_x, |
ball_rotation_y, |
ball_rotation_z, |
], |
axis=0 |
) |
BALL_OWEND_1HOT = {-1: [0, 0], 0: [1, 0], 1: [0, 1]} |
ball_owned_team_ = obs['ball_owned_team'] |
ball_owned_team = BALL_OWEND_1HOT[ball_owned_team_] |
PLAYER_1HOT = np.concatenate([np.eye(11), np.zeros((1, 11))]) |
ball_owned_player_ = PLAYER_1HOT[obs['ball_owned_player']] |
if ball_owned_team_ == -1: |
my_ball_owned_player = PLAYER_1HOT[-1] |
op_ball_owned_player = PLAYER_1HOT[-1] |
elif ball_owned_team_ == 0: |
my_ball_owned_player = ball_owned_player_ |
op_ball_owned_player = PLAYER_1HOT[-1] |
else: |
my_ball_owned_player = PLAYER_1HOT[-1] |
op_ball_owned_player = ball_owned_player_ |
ball_features = np.concatenate([obs['ball'], obs['ball_direction'], obs['ball_rotation']]).astype(np.float32) |
left_team_features = np.concatenate( |
[ |
[[1] for _ in obs['left_team']], |
obs['left_team'], |
obs['left_team_direction'], |
[[v] for v in obs['left_team_tired_factor']], |
[[v] for v in obs['left_team_yellow_card']], |
[[v] for v in obs['left_team_active']], |
my_ball_owned_player[..., np.newaxis] |
], |
axis=1 |
).astype(np.float32) |
left_team_indice = np.arange(0, 11, dtype=np.int32) |
right_team_features = np.concatenate( |
[ |
[[0] for _ in obs['right_team']], |
obs['right_team'], |
obs['right_team_direction'], |
[[v] for v in obs['right_team_tired_factor']], |
[[v] for v in obs['right_team_yellow_card']], |
[[v] for v in obs['right_team_active']], |
op_ball_owned_player[..., np.newaxis] |
], |
axis=1 |
).astype(np.float32) |
right_team_indice = np.arange(0, 11, dtype=np.int32) |
def get_distance(xy1, xy2): |
return (((xy1 - xy2) ** 2).sum(axis=-1)) ** 0.5 |
def get_line_distance(x1, x2): |
return np.abs(x1 - x2) |
def multi_scale(x, scale): |
return 2 / (1 + np.exp(-np.array(x)[..., np.newaxis] / np.array(scale))) |
both_team = np.array(obs['left_team'] + obs['right_team'], dtype=np.float32) |
ball = np.array([obs['ball'][:2]], dtype=np.float32) |
goal = np.array([[-1, 0], [1, 0]], dtype=np.float32) |
goal_line_x = np.array([-1, 1], dtype=np.float32) |
side_line_y = np.array([-.42, .42], dtype=np.float32) |
b2g_distance = get_distance(ball, goal) |
b2gl_distance = get_line_distance(ball[0][0], goal_line_x) |
b2sl_distance = get_line_distance(ball[0][1], side_line_y) |
b2o_distance = np.concatenate([b2g_distance, b2gl_distance, b2sl_distance], axis=-1) |
p2b_distance = get_distance(both_team[:, np.newaxis, :], ball[np.newaxis, :, :]) |
p2g_distance = get_distance(both_team[:, np.newaxis, :], goal[np.newaxis, :, :]) |
p2gl_distance = get_line_distance(both_team[:, :1], goal_line_x[np.newaxis, :]) |
p2sl_distance = get_line_distance(both_team[:, 1:], side_line_y[np.newaxis, :]) |
p2bo_distance = np.concatenate([p2b_distance, p2g_distance, p2gl_distance, p2sl_distance], axis=-1) |
p2p_distance = get_distance(both_team[:, np.newaxis, :], both_team[np.newaxis, :, :]) |
control_flag_ = np.array(PLAYER_1HOT[obs['active']], dtype=np.float32) |
control_flag = np.concatenate([control_flag_, np.zeros(len(obs['right_team']))])[..., np.newaxis] |
DIR = [ |
[-1, 0], |
[-.707, -.707], |
[0, 1], |
[.707, -.707], |
[1, 0], |
[.707, .707], |
[0, -1], |
[-.707, .707] |
] |
sticky_direction = DIR[obs['sticky_actions'][:8].index(1)] if 1 in obs['sticky_actions'][:8] else [0, 0] |
sticky_flags = obs['sticky_actions'][8:] |
control_features = np.concatenate([ |
sticky_direction, |
sticky_flags, |
]).astype(np.float32) |
if obs['steps_left'] > info['half_step']: |
steps_left_half = obs['steps_left'] - info['half_step'] |
else: |
steps_left_half = obs['steps_left'] |
match_features = np.concatenate( |
[ |
multi_scale(obs['score'], [1, 3]).ravel(), |
multi_scale(obs['score'][0] - obs['score'][1], [1, 3]), |
multi_scale(obs['steps_left'], [10, 100, 1000, 10000]), |
multi_scale(steps_left_half, [10, 100, 1000, 10000]), |
ball_owned_team, |
] |
).astype(np.float32) |
mode_index = np.array([obs['game_mode']], dtype=np.int32) |
action_history = np.array(action_history, dtype=np.int32)[..., None] |
return { |
'ball': ball_features, |
'match': match_features, |
'player': { |
'self': left_team_features, |
'opp': right_team_features |
}, |
'control': control_features, |
'player_index': { |
'self': left_team_indice, |
'opp': right_team_indice |
}, |
'mode_index': mode_index, |
'control_flag': control_flag, |
'distance': { |
'p2p': p2p_distance, |
'p2bo': p2bo_distance, |
'b2o': b2o_distance |
}, |
'cnn_feature': cnn_feature, |
'action_history': action_history |
} |
Action.LongPass: 20, |
Action.HighPass: 28, |
Action.ShortPass: 36, |
Action.Shot: 44, |
} |
class Environment: |
ACTION_LEN = 19 + 4 * 8 |
ACTION_IDX = list(range(ACTION_LEN)) |
def __init__(self, args={}): |
self.env_map = {} |
self.env = None |
self.limit_steps = args.get('limit_steps', 100000) |
self.frame_skip = args.get('frame_skip', 0) |
self.reset_common() |
def reset_common(self): |
self.finished = False |
self.prev_score = [0, 0] |
self.reset_flag = False |
self.checkpoint = [ |
[0.95, 0.85, 0.75, 0.65, 0.55, 0.45, 0.35, 0.25, 0.15, 0.05], |
[0.95, 0.85, 0.75, 0.65, 0.55, 0.45, 0.35, 0.25, 0.15, 0.05] |
] |
self.states = [] |
self.half_step = 1500 |
self.reserved_action = [None, None] |
def reset(self, args={}): |
if len(self.env_map) == 0: |
from gfootball.env import football_action_set |
from gfootball.env.wrappers import Simple115StateWrapper |
from kaggle_environments import make |
self.ACTION_STR = football_action_set.action_set_v1 |
self.ACTION2STR = {i: j for i, j in enumerate(football_action_set.action_set_v1)} |
self.STR2ACTION = {j: i for i, j in self.ACTION2STR.items()} |
self.env_map["real"] = make("football", configuration={"scenario_name": "11_vs_11_kaggle"}) |
self.env_map["eval"] = make("football", configuration={"scenario_name": "11_vs_11_kaggle_1000_500"}) |
self.env_map["train"] = make("football", configuration={"scenario_name": "11_vs_11_kaggle_train"}) |
role = args.get('role', '') |
limit_rate = args.get('limit_rate', 1) |
if role == 'g': |
self.env = self.env_map['train' if limit_rate < 0.95 else 'real'] |
elif role == 'e': |
self.env = self.env_map['eval'] |
else: |
self.env = self.env_map['real'] |
state = self.env.reset() |
self.resets_info(state) |
def resets_info(self, state): |
self.reset_common() |
state = copy.deepcopy(state) |
state = [self._preprocess_state(s) for s in state] |
self.states.append(state) |
self.half_step = state[0]['observation']['players_raw'][0]['steps_left'] // 2 |
def reset_info(self, state): |
self.resets_info(state) |
def chance(self): |
pass |
def action2str(self, a: int): |
return str(a) |
def str2action(self, s: str): |
return int(s) |
def plays(self, actions): |
self._plays(actions) |
def _plays(self, actions): |
actions = copy.deepcopy(actions) |
for i, res_action in enumerate(self.reserved_action): |
if res_action is not None: |
actions[i] = res_action |
for i, action in enumerate(actions): |
atomic_a, reserved_a = self.special_to_actions(action) |
actions[i] = atomic_a |
self.reserved_action[i] = reserved_a |
state = self.env.step([[actions[0]], [actions[1]]]) |
state = copy.deepcopy(state) |
state = [self._preprocess_state(s) for s in state] |
self.states.append(state) |
if state[0]['status'] == 'DONE' or len(self.states) > self.limit_steps: |
self.finished = True |
def plays_info(self, state): |
state = copy.deepcopy(state) |
state = [self._preprocess_state(s) for s in state] |
self.states.append(state) |
def play_info(self, state): |
self.plays_info(state) |
def diff_info(self): |
return self.states[-1] |
def turns(self): |
return self.players() |
def players(self): |
return [0, 1] |
def terminal(self): |
return self.finished |
def reward(self): |
prev_score = self.prev_score |
score = self.score() |
rs = [] |
scored_player = None |
for p in self.players(): |
r = 1.0 * (score[p] - prev_score[p]) - 1.0 * (score[1 - p] - prev_score[1 - p]) |
rs.append(r) |
if r != 0: |
self.reset_flag = True |
scored_player = p |
self.prev_score = self.score() |
return rs |
def get_goal_distance(xy1): |
return (((xy1 - np.array([1, 0])) ** 2).sum(axis=-1)) ** 0.5 |
checkpoint_reward = [] |
for p in self.players(): |
obs = self.raw_observation(p)['players_raw'][0] |
ball_owned_team = obs['ball_owned_team'] |
if ball_owned_team == p and len(self.checkpoint[p]) != 0: |
ball = obs['ball'][:2] |
goal_distance = get_goal_distance(ball) |
if goal_distance < self.checkpoint[p][0]: |
cr = 0 |
for idx, c in enumerate(self.checkpoint[p]): |
if goal_distance < c: |
cr += 0.1 |
else: |
break |
self.checkpoint[p] = self.checkpoint[p][idx:] |
checkpoint_reward.append(cr) |
else: |
checkpoint_reward.append(0) |
else: |
checkpoint_reward.append(0) |
if scored_player is not None: |
checkpoint_reward[scored_player] += len( |
self.checkpoint[scored_player] |
) * 0.1 |
self.checkpoint[scored_player] = [] |
return [rs[p] + checkpoint_reward[p] for p in self.players()] |
def is_reset_state(self): |
if self.reset_flag: |
self.reset_flag = False |
return True |
return False |
def score(self): |
if len(self.states) == 0: |
return [0, 0] |
obs = self.states[-1] |
return [ |
obs[0]['observation']['players_raw'][0]['score'][0], obs[1]['observation']['players_raw'][0]['score'][0] |
] |
def outcome(self): |
if len(self.states) == 0: |
return [0, 0] |
scores = self.score() |
if scores[0] > scores[1]: |
score_diff = scores[0] - scores[1] |
outcome_tanh = np.tanh(score_diff ** 0.8) |
return [outcome_tanh, -outcome_tanh] |
elif scores[0] < scores[1]: |
score_diff = scores[1] - scores[0] |
outcome_tanh = np.tanh(score_diff ** 0.8) |
return [-outcome_tanh, outcome_tanh] |
return [0, 0] |
def legal_actions(self, player): |
all_actions = [i for i in copy.copy(self.ACTION_IDX) if i != 19] |
if len(self.states) == 0: |
return all_actions |
obs = self.raw_observation(player)['players_raw'][0] |
illegal_actions = set() |
ball_owned_team = obs['ball_owned_team'] |
if ball_owned_team != 0: |
illegal_actions.add(int(Action.LongPass)) |
illegal_actions.add(int(Action.HighPass)) |
illegal_actions.add(int(Action.ShortPass)) |
illegal_actions.add(int(Action.Shot)) |
illegal_actions.add(int(Action.Dribble)) |
for d in range(8): |
illegal_actions.add(KICK_ACTIONS[Action.LongPass] + d) |
illegal_actions.add(KICK_ACTIONS[Action.HighPass] + d) |
illegal_actions.add(KICK_ACTIONS[Action.ShortPass] + d) |
illegal_actions.add(KICK_ACTIONS[Action.Shot] + d) |
else: |
illegal_actions.add(int(Action.Slide)) |
sticky_actions = obs['sticky_actions'] |
if type(sticky_actions) == set: |
sticky_actions = [0] * 10 |
if sticky_actions[action_to_sticky_index[Action.Sprint]] == 0: |
illegal_actions.add(int(Action.ReleaseSprint)) |
if sticky_actions[action_to_sticky_index[Action.Dribble]] == 0: |
illegal_actions.add(int(Action.ReleaseDribble)) |
if 1 not in sticky_actions[:8]: |
illegal_actions.add(int(Action.ReleaseDirection)) |
return [a for a in all_actions if a not in illegal_actions] |
def action_length(self): |
return self.ACTION_LEN |
def raw_observation(self, player): |
if len(self.states) > 0: |
return self.states[-1][player]['observation'] |
else: |
def observation(self, player): |
info = {'half_step': self.half_step} |
return feature_from_states(self.states, info, player) |
def _preprocess_state(self, player_state): |
if player_state is None: |
return player_state |
o = player_state['observation']['players_raw'][0] |
mode = o['game_mode'] |
if mode == GameMode.FreeKick or \ |
mode == GameMode.Corner or \ |
mode == GameMode.Penalty or \ |
mode == GameMode.GoalKick: |
def dist(xy1, xy2): |
return ((xy1[0] - xy2[0]) ** 2 + (xy1[1] - xy2[1]) ** 2) ** 0.5 |
team_player_position = [(0, i, p) for i, p in enumerate(o['left_team'])] + \ |
[(1, i, p) for i, p in enumerate(o['right_team'])] |
distances = [(t[0], t[1], dist(t[2], o['ball'][:2])) for t in team_player_position] |
distances = sorted(distances, key=lambda x: x[2]) |
o['ball_owned_team'] = distances[0][0] |
o['ball_owned_player'] = distances[0][1] |
if len(player_state['action']) == 0: |
player_state['action'].append(0) |
return player_state |
def special_to_actions(self, saction): |
if not 0 <= saction < 52: |
return [0, None] |
for a, index in KICK_ACTIONS.items(): |
if index <= saction < index + 8: |
return [a, Action(saction - index + 1)] |
return [saction, None] |
'''def action_to_specials(self, action): |
p = np.zeros(self.action_length()) |
p[action] = 1 |
sticky_direction = |
if action == Action.LongPass: |
return |
return p / p.sum()''' |
def funcname(self, parameter_list): |
""" |
docstring |
""" |
pass |
def net(self): |
return FootballNet |
def rule_based_action(self, player): |
return 19 |
if __name__ == '__main__': |
e = Environment() |
net = e.net()(e) |
net.eval() |
for _ in range(1): |
e.reset() |
o = e.observation(0) |
net.inference(o, None) |
while not e.terminal(): |
_ = e.observation(0) |
_ = e.observation(1) |
print(e.env.configuration.episodeSteps) |
print(e.raw_observation(0)['players_raw'][0]['steps_left']) |
action_list = [0, 0] |
action_list[0] = random.choice(e.legal_actions(0)) |
action_list[1] = e.rule_based_action_C(1) |
print(len(e.states), action_list) |
e.plays(action_list) |
print(e.checkpoint) |
print(e.reward()) |
print(e) |
print(e.score()) |
print(e.outcome()) |