""" |
Overview: |
This code implements the Monte Carlo Tree Search (MCTS) algorithm with the integration of neural networks. |
The Node class represents a node in the Monte Carlo tree and implements the basic functionalities expected in a node. |
The MCTS class implements the specific search functionality and provides the optimal action through the ``get_next_action`` method. |
Compared to traditional MCTS, the introduction of value networks and policy networks brings several advantages. |
During the expansion of nodes, it is no longer necessary to explore every single child node, but instead, |
the child nodes are directly selected based on the prior probabilities provided by the neural network. |
This reduces the breadth of the search. When estimating the value of leaf nodes, there is no need for a rollout; |
instead, the value output by the neural network is used, which saves the depth of the search. |
""" |
import math |
from typing import List, Tuple, Union, Callable, Type, Dict, Any |
import numpy as np |
import torch |
from ding.envs import BaseEnv |
from easydict import EasyDict |
from numpy import ndarray |
from lzero.mcts.ptree.ptree_sez import Action |
class Node(object): |
""" |
Overview: |
A class for a node in a Monte Carlo Tree. The properties of this class store basic information about the node, |
such as its parent node, child nodes, and the number of times the node has been visited. |
The methods of this class implement basic functionalities that a node should have, such as propagating the value back, |
checking if the node is the root node, and determining if it is a leaf node. |
""" |
def __init__(self, parent: "Node" = None, prior_p: float = 1.0) -> None: |
""" |
Overview: |
Initialize a Node object. |
Arguments: |
- parent (:obj:`Node`): The parent node of the current node. |
- prior_p (:obj:`Float`): The prior probability of selecting this node. |
""" |
self._parent = parent |
self._children = {} |
self._visit_count = 0 |
self._value_sum = 0 |
self.prior_p = prior_p |
@property |
def value(self) -> float: |
""" |
Overview: |
The value of the current node. |
Returns: |
- output (:obj:`Int`): Current value, used to compute ucb score. |
""" |
if self._visit_count == 0: |
return 0 |
return self._value_sum / self._visit_count |
def update(self, value: float) -> None: |
""" |
Overview: |
Update the current node information, such as ``_visit_count`` and ``_value_sum``. |
Arguments: |
- value (:obj:`Float`): The value of the node. |
""" |
self._visit_count += 1 |
self._value_sum += value |
def update_recursive(self, leaf_value: float, battle_mode_in_simulation_env: str) -> None: |
""" |
Overview: |
Update node information recursively. |
The same game state has opposite values in the eyes of two players playing against each other. |
The value of a node is evaluated from the perspective of the player corresponding to its parent node. |
In ``self_play_mode``, because the player corresponding to a node changes every step during the backpropagation process, the value needs to be negated once. |
In ``play_with_bot_mode``, since all nodes correspond to the same player, the value does not need to be negated. |
Arguments: |
- leaf_value (:obj:`Float`): The value of the node. |
- battle_mode_in_simulation_env (:obj:`str`): The mode of MCTS, can be 'self_play_mode' or 'play_with_bot_mode'. |
""" |
if battle_mode_in_simulation_env == 'self_play_mode': |
self.update(leaf_value) |
if self.is_root(): |
return |
self._parent.update_recursive(-leaf_value, battle_mode_in_simulation_env) |
if battle_mode_in_simulation_env == 'play_with_bot_mode': |
self.update(leaf_value) |
if self.is_root(): |
return |
self._parent.update_recursive(leaf_value, battle_mode_in_simulation_env) |
def is_leaf(self) -> bool: |
""" |
Overview: |
Check if the current node is a leaf node or not. |
Returns: |
- output (:obj:`Bool`): If self._children is empty, it means that the node has not |
been expanded yet, which indicates that the node is a leaf node. |
""" |
return self._children == {} |
def is_root(self) -> bool: |
""" |
Overview: |
Check if the current node is a root node or not. |
Returns: |
- output (:obj:`Bool`): If the node does not have a parent node, |
then it is a root node. |
""" |
return self._parent is None |
@property |
def parent(self) -> None: |
""" |
Overview: |
Get the parent node of the current node. |
Returns: |
- output (:obj:`Node`): The parent node of the current node. |
""" |
return self._parent |
@property |
def children(self) -> None: |
""" |
Overview: |
Get the dictionary of children nodes of the current node. |
Returns: |
- output (:obj:`dict`): A dictionary representing the children of the current node. |
""" |
return self._children |
@property |
def visit_count(self) -> None: |
""" |
Overview: |
Get the number of times the current node has been visited. |
Returns: |
- output (:obj:`Int`): The number of times the current node has been visited. |
""" |
return self._visit_count |
class MCTS(object): |
""" |
Overview: |
A class for Monte Carlo Tree Search (MCTS). The methods in this class implement the steps involved in MCTS, such as selection and expansion. |
Based on this, the ``_simulate`` method is used to traverse from the root node to a leaf node. |
Finally, by repeatedly calling ``_simulate`` through ``get_next_action``, the optimal action is obtained. |
""" |
def __init__(self, cfg: EasyDict, simulate_env: Type[BaseEnv]) -> None: |
""" |
Overview: |
Initializes the MCTS process. |
Arguments: |
- cfg (:obj:`EasyDict`): A dictionary containing the configuration parameters for the MCTS process. |
""" |
self._cfg = cfg |
self.legal_actions = self._cfg.legal_actions |
self.action_space_size = self._cfg.action_space_size |
self.num_of_sampled_actions = self._cfg.num_of_sampled_actions |
print(f'num_of_sampled_actions: {self.num_of_sampled_actions}') |
self.continuous_action_space = self._cfg.continuous_action_space |
self._max_moves = self._cfg.get('max_moves', 512) |
self._num_simulations = self._cfg.get('num_simulations', 800) |
self._pb_c_base = self._cfg.get('pb_c_base', 19652) |
self._pb_c_init = self._cfg.get('pb_c_init', 1.25) |
self._root_dirichlet_alpha = self._cfg.get( |
'root_dirichlet_alpha', 0.3 |
) |
self._root_noise_weight = self._cfg.get('root_noise_weight', 0.25) |
self.mcts_search_cnt = 0 |
self.simulate_env = simulate_env |
def get_next_action( |
self, |
state_config_for_env_reset: Dict[str, Any], |
policy_value_func: Callable, |
temperature: float = 1.0, |
sample: bool = True |
) -> Tuple[int, List[float]]: |
""" |
Overview: |
Get the next action to take based on the current state of the game. |
Arguments: |
- state_config_for_env_reset (:obj:`Dict`): The config of state when reset the env. |
- policy_value_func (:obj:`Function`): The Callable to compute the action probs and state value. |
- temperature (:obj:`Float`): The exploration temperature. |
- sample (:obj:`Bool`): Whether to sample an action from the probabilities or choose the most probable action. |
Returns: |
- action (:obj:`Int`): The selected action to take. |
- action_probs (:obj:`List`): The output probability of each action. |
""" |
self.root = Node() |
self.simulate_env.reset( |
start_player_index=state_config_for_env_reset.start_player_index, |
init_state=state_config_for_env_reset.init_state, |
) |
self._expand_leaf_node(self.root, self.simulate_env, policy_value_func) |
if sample: |
self._add_exploration_noise(self.root) |
for n in range(self._num_simulations): |
self.simulate_env.reset( |
start_player_index=state_config_for_env_reset.start_player_index, |
init_state=state_config_for_env_reset.init_state, |
) |
self.simulate_env.battle_mode = self.simulate_env.battle_mode_in_simulation_env |
self._simulate(self.root, self.simulate_env, policy_value_func) |
action_visits = [] |
for action in range(self.simulate_env.action_space.n): |
current_action_object = Action(action) |
if current_action_object in self.root.children: |
action_visits.append((action, self.root.children[current_action_object].visit_count)) |
else: |
action_visits.append((action, 0)) |
actions, visits = zip(*action_visits) |
visits_t = torch.as_tensor(visits, dtype=torch.float32) |
visits_t /= temperature |
action_probs = (visits_t / visits_t.sum()).numpy() |
if sample: |
action = np.random.choice(actions, p=action_probs) |
else: |
action = actions[np.argmax(action_probs)] |
self.mcts_search_cnt += 1 |
self.root_sampled_actions = np.nonzero(action_probs)[0] |
return action, action_probs |
def _simulate(self, node: Node, simulate_env: Type[BaseEnv], policy_value_func: Callable) -> None: |
""" |
Overview: |
Run a single playout from the root to the leaf, getting a value at the leaf and propagating it back through its parents. |
State is modified in-place, so a deepcopy must be provided. |
Arguments: |
- node (:obj:`Class Node`): Current node when performing mcts search. |
- simulate_env (:obj:`Class BaseGameEnv`): The class of simulate env. |
- policy_value_func (:obj:`Function`): The Callable to compute the action probs and state value. |
""" |
while not node.is_leaf(): |
action, node = self._select_child(node, simulate_env) |
if action is None: |
break |
simulate_env.step(action.value) |
done, winner = simulate_env.get_done_winner() |
""" |
in ``self_play_mode``, the leaf_value is calculated from the perspective of player ``simulate_env.current_player``. |
in ``play_with_bot_mode``, the leaf_value is calculated from the perspective of player 1. |
""" |
if not done: |
leaf_value = self._expand_leaf_node(node, simulate_env, policy_value_func) |
else: |
if simulate_env.battle_mode_in_simulation_env == 'self_play_mode': |
if winner == -1: |
leaf_value = 0 |
else: |
leaf_value = 1 if simulate_env.current_player == winner else -1 |
if simulate_env.battle_mode_in_simulation_env == 'play_with_bot_mode': |
if winner == -1: |
leaf_value = 0 |
elif winner == 1: |
leaf_value = 1 |
elif winner == 2: |
leaf_value = -1 |
if simulate_env.battle_mode_in_simulation_env == 'play_with_bot_mode': |
node.update_recursive(leaf_value, simulate_env.battle_mode_in_simulation_env) |
elif simulate_env.battle_mode_in_simulation_env == 'self_play_mode': |
node.update_recursive(-leaf_value, simulate_env.battle_mode_in_simulation_env) |
def _select_child(self, node: Node, simulate_env: Type[BaseEnv]) -> Tuple[Union[int, float], Node]: |
""" |
Overview: |
Select the child with the highest UCB score. |
Arguments: |
- node (:obj:`Class Node`): Current node. |
Returns: |
- action (:obj:`Int`): choose the action with the highest ucb score. |
- child (:obj:`Node`): the child node reached by executing the action with the highest ucb score. |
""" |
action = None |
child_node = None |
best_score = -9999999 |
for action_tmp, child_node_tmp in node.children.items(): |
if action_tmp.value in simulate_env.legal_actions: |
score = self._ucb_score(node, child_node_tmp) |
if score > best_score: |
best_score = score |
action = action_tmp |
child_node = child_node_tmp |
else: |
print(f'error: {action_tmp} not in {simulate_env.legal_actions}') |
if child_node is None: |
child_node = node |
if action is None: |
print('error: action is None') |
return action, child_node |
def _expand_leaf_node(self, node: Node, simulate_env: Type[BaseEnv], policy_value_func: Callable) -> float: |
""" |
Overview: |
expand the node with the policy_value_func. |
Arguments: |
- node (:obj:`Class Node`): current node when performing mcts search. |
- simulate_env (:obj:`Class BaseGameEnv`): the class of simulate env. |
- policy_value_func (:obj:`Function`): the Callable to compute the action probs and state value. |
Returns: |
- leaf_value (:obj:`Bool`): the leaf node's value. |
""" |
if self.continuous_action_space: |
pass |
else: |
legal_action_probs_dict, leaf_value = policy_value_func(simulate_env) |
node.legal_actions = [] |
actions = list(legal_action_probs_dict.keys()) |
probabilities = list(legal_action_probs_dict.values()) |
probabilities = np.array(probabilities) |
probabilities /= probabilities.sum() |
num_samples = min(len(actions), self.num_of_sampled_actions) |
sampled_actions = np.random.choice(actions, size=num_samples, p=probabilities, replace=False) |
sampled_actions = sampled_actions.tolist() |
for action_index in range(num_samples): |
node.children[Action(sampled_actions[action_index])] = \ |
Node( |
parent=node, |
prior_p=legal_action_probs_dict[sampled_actions[action_index]], |
) |
node.legal_actions.append(Action(sampled_actions[action_index])) |
return leaf_value |
def _ucb_score(self, parent: Node, child: Node) -> float: |
""" |
Overview: |
Compute UCB score. The score for a node is based on its value, plus an exploration bonus based on the prior. |
For more details, please refer to this paper: http://gauss.ececs.uc.edu/Workshops/isaim2010/papers/rosin.pdf |
UCB = Q(s,a) + P(s,a) \cdot \frac{N(\text{parent})}{1+N(\text{child})} \cdot \left(c_1 + \log\left(\frac{N(\text{parent})+c_2+1}{c_2}\right)\right) |
- Q(s,a): value of a child node. |
- P(s,a): The prior of a child node. |
- N(parent): The number of the visiting of the parent node. |
- N(child): The number of the visiting of the child node. |
- c_1: a parameter given by self._pb_c_init to control the influence of the prior P(s,a) relative to the value Q(s,a). |
- c_2: a parameter given by self._pb_c_base to control the influence of the prior P(s,a) relative to the value Q(s,a). |
Arguments: |
- parent (:obj:`Class Node`): Current node. |
- child (:obj:`Class Node`): Current node's child. |
Returns: |
- score (:obj:`Bool`): The UCB score. |
""" |
pb_c = math.log((parent.visit_count + self._pb_c_base + 1) / self._pb_c_base) + self._pb_c_init |
pb_c *= math.sqrt(parent.visit_count) / (child.visit_count + 1) |
node_prior = "density" |
if node_prior == "uniform": |
prior_score = pb_c * (1 / len(parent.children)) |
elif node_prior == "density": |
if self.continuous_action_space: |
prior_score = pb_c * ( |
torch.exp(child.prior_p) / ( |
sum([torch.exp(node.prior_p) for node in parent.children.values()]) + 1e-6) |
) |
else: |
prior_score = pb_c * (child.prior_p / (sum([node.prior_p for node in parent.children.values()]) + 1e-6)) |
else: |
raise ValueError("{} is unknown prior option, choose uniform or density") |
value_score = child.value |
prior_score = pb_c * child.prior_p |
return prior_score + value_score |
def _add_exploration_noise(self, node: Node) -> None: |
""" |
Overview: |
Add exploration noise. |
Arguments: |
- node (:obj:`Class Node`): Current node. |
""" |
actions = list(node.children.keys()) |
alpha = [self._root_dirichlet_alpha] * len(actions) |
noise = np.random.dirichlet(alpha) |
frac = self._root_noise_weight |
for a, n in zip(actions, noise): |
node.children[a].prior_p = node.children[a].prior_p * (1 - frac) + n * frac |
def get_sampled_actions(self) -> List[int]: |
""" |
Overview: |
Get the sampled actions of the root node. |
Returns: |
- output (:obj:`List`): The sampled actions of the root node. |
""" |
return self.root_sampled_actions |