File size: 5,175 Bytes
5e6fb83
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import gymnasium
from gymnasium import spaces
import numpy as np

# Define the environment class. There is a (12*12) grid, and the agent can move in four directions and collect randomly initialized items. The items are divided into 21 categories represented by int in range(21), and the number of each category is a multiple of 4.

class GridWorldEnv(gymnasium.Env):
    metadata = {'render.modes': ['human']}

    def __init__(self):
        super(GridWorldEnv, self).__init__()
        self.grid_size = (12, 12)
        self.num_categories = 21
        self.action_space = spaces.Discrete(5)  # 0: down, 1: right, 2: up, 3: left, 4: collect
        # self.action_space = spaces.Box(low=0, high=5, shape=(1,), dtype=np.int64)
        # self.observation_space = spaces.Dict({
        #     'grid': spaces.Box(low=-1 / 20, high=20 / 20, shape=self.grid_size, dtype=np.float64),
        #     'loc': spaces.Box(low=0 / 12, high=11 / 12, shape=(2,), dtype=np.float64),
        #     'bag': spaces.Box(low=0 / 4, high=4 / 4, shape=(self.num_categories, 1), dtype=np.float64)})
        self.observation_space = spaces.Box(low=-1, high=20, shape=(144+2+self.num_categories, ), dtype=np.float64)
        self.max_steps = 4 * 12 * 12
        self.reset()

    def reset(self, seed=None):
        self.grid = self.initialize_grid()
        self.loc = np.random.randint(0, self.grid_size[0], 2)
        self.bag = np.zeros((self.num_categories, 1), dtype=int)
        self.steps = 0
        return self._get_obs(), self._get_info()

    def initialize_grid(self):  # Randomly initialize the grid
        grid = np.zeros(self.grid_size, dtype=int)
        total_cells = self.grid_size[0] * self.grid_size[1]
        remaining_cells = total_cells
        category_counts = []
        # Ensure each category has at least 4 items and is a multiple of 4
        for _ in range(self.num_categories - 1):
            count = np.random.randint(1, 3) * 4
            if count > remaining_cells:
                    count = remaining_cells
            category_counts.append(count)
            remaining_cells -= count
        # Assign the remaining cells to the last category
        category_counts.append(remaining_cells)
        # Shuffle the positions and assign categories
        positions = np.random.permutation(total_cells)
        index = 0
        for category, count in enumerate(category_counts):
            for _ in range(count):
                x, y = divmod(positions[index], self.grid_size[1])
                grid[x, y] = category
                index += 1

        return grid

    def step(self, action):
        action = int(action)
        self.steps += 1
        reward = -0.1 - np.sum(self.bag) / (12 * 12)
        if action == 0:  # down
            self.loc[0] = min(self.loc[0] + 1, self.grid_size[0] - 1)
        elif action == 1:  # right
            self.loc[1] = min(self.loc[1] + 1, self.grid_size[1] - 1)
        elif action == 2:  # up
            self.loc[0] = max(self.loc[0] - 1, 0)
        elif action == 3:  # left
            self.loc[1] = max(self.loc[1] - 1, 0)
        elif action == 4:  # collect
            x, y = self.loc
            if self.grid[x, y] == -1:
                reward -= 2  # Penalty for collecting an empty cell
            else:
                self.bag[self.grid[x, y]] += 1
                self.grid[x, y] = -1  # Remove the item from the grid

        # 如果bag中有4个相同的物品,消除这4个物品,并获得奖励=+1
        if np.any(self.bag == 4):
            category = np.argmax(self.bag == 4)
            self.bag[category] -= 4
            reward += 1

        done = bool(np.sum(self.grid) == -12 * 12)  # Done when all items are collected
        if done:
            reward += 100
        truncated = bool(self.steps >= self.max_steps)  # Truncate the episode after a certain number of steps
        if truncated:
            reward = -3 * (np.sum(self.bag) + np.sum(self.grid != -1))  # Penalty for not collecting enough items
        
        # print(f'grid: {self.grid}')
        return self._get_obs(), reward, done, truncated, self._get_info()
    
    def _get_obs(self):
        # return {'grid': np.copy(self.grid) / 20, 'loc': np.copy(self.loc) / 12, 'bag': np.copy(self.bag) / 4}
        return np.concatenate((self.grid.flatten(), self.loc, self.bag.flatten()))
    
    def _get_info(self):
        return {'grid': np.copy(self.grid) / 20, 'loc': np.copy(self.loc) / 12, 'bag': np.copy(self.bag) / 4}
    
    def render(self, mode='rgb_array'):
        if mode == 'human':
            print("Grid:")
            print(self.grid)
            print("Loc:", self.loc)
            print("Bag:", self.bag)

    def close(self):
        pass

if __name__ == "__main__":
    # Example usage
    env = GridWorldEnv()
    obs = env.reset()
    env.render()
    obs, reward, done, info = env.step(1)  # Move right
    env.render()
    obs, reward, done, info = env.step(0)  # Move down
    env.render()
    obs, reward, done, info = env.step(4)  # Collect item
    env.render()