File size: 3,261 Bytes
079c32c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
import copy
import random
import numpy as np
import gym
from typing import Any, Dict, Optional, Union, List
from ding.envs import BaseEnv, BaseEnvTimestep
from ding.utils import ENV_REGISTRY
from ding.torch_utils import to_ndarray
@ENV_REGISTRY.register('bitflip')
class BitFlipEnv(BaseEnv):
def __init__(self, cfg: dict) -> None:
self._cfg = cfg
self._n_bits = cfg.n_bits
self._state = np.zeros(self._n_bits)
self._goal = np.zeros(self._n_bits)
self._curr_step = 0
self._maxsize = self._n_bits
self._eval_episode_return = 0
self._observation_space = gym.spaces.Box(low=0, high=1, shape=(2 * self._n_bits, ), dtype=np.float32)
self._action_space = gym.spaces.Discrete(self._n_bits)
self._reward_space = gym.spaces.Box(low=0.0, high=1.0, shape=(1, ), dtype=np.float32)
def reset(self) -> np.ndarray:
self._curr_step = 0
self._eval_episode_return = 0
if hasattr(self, '_seed') and hasattr(self, '_dynamic_seed') and self._dynamic_seed:
random_seed = 100 * random.randint(1, 1000)
np.random.seed(self._seed + random_seed)
elif hasattr(self, '_seed'):
np.random.seed(self._seed)
self._state = np.random.randint(0, 2, size=(self._n_bits, )).astype(np.float32)
self._goal = np.random.randint(0, 2, size=(self._n_bits, )).astype(np.float32)
while (self._state == self._goal).all():
self._goal = np.random.randint(0, 2, size=(self._n_bits, )).astype(np.float32)
obs = np.concatenate([self._state, self._goal], axis=0)
return obs
def close(self) -> None:
pass
def check_success(self, state: np.ndarray, goal: np.ndarray) -> bool:
return (self._state == self._goal).all()
def seed(self, seed: int, dynamic_seed: bool = True) -> None:
self._seed = seed
self._dynamic_seed = dynamic_seed
random.seed(self._seed)
def step(self, action: np.ndarray) -> BaseEnvTimestep:
self._state[action] = 1 - self._state[action]
if self.check_success(self._state, self._goal):
rew = np.array([1]).astype(np.float32)
done = True
else:
rew = np.array([0]).astype(np.float32)
done = False
self._eval_episode_return += float(rew)
if self._curr_step >= self._maxsize - 1:
done = True
info = {}
if done:
info['eval_episode_return'] = self._eval_episode_return
self._curr_step += 1
obs = np.concatenate([self._state, self._goal], axis=0)
return BaseEnvTimestep(obs, rew, done, info)
def random_action(self) -> np.ndarray:
random_action = self.action_space.sample()
random_action = to_ndarray([random_action], dtype=np.int64)
return random_action
@property
def observation_space(self) -> gym.spaces.Space:
return self._observation_space
@property
def action_space(self) -> gym.spaces.Space:
return self._action_space
@property
def reward_space(self) -> gym.spaces.Space:
return self._reward_space
def __repr__(self) -> str:
return "DI-engine BitFlip Env({})".format('bitflip')
|