|
import numpy as np
|
|
import itertools
|
|
import random
|
|
from make_env import GridWorldEnv
|
|
from concurrent.futures import ProcessPoolExecutor
|
|
|
|
class Algorithm_Agent():
|
|
def __init__(self, num_categories, grid_size, grid, loc):
|
|
self.num_categories = num_categories
|
|
self.grid_size = grid_size
|
|
self.grid = grid
|
|
self.current_loc = [loc[0], loc[1]]
|
|
self.path, self.path_category = self.arrange_points()
|
|
|
|
self.actions = self.plan_action()
|
|
|
|
|
|
def calculate_length(self, path, category_path, elim_path):
|
|
lengths = np.sum(np.abs(np.array(path[:-1]) - np.array(path[1:])), axis=1)
|
|
motion_length = np.sum(lengths)
|
|
cum_lengths = np.cumsum(lengths)[::-1] / 14.4
|
|
load_length = np.sum(cum_lengths) - 4 * np.sum(np.array(cum_lengths) * np.array(elim_path[:-1]))
|
|
|
|
return motion_length + load_length
|
|
|
|
|
|
def get_elim_path(self, category_path):
|
|
elim_path = [0] * len(category_path)
|
|
for i in range(len(category_path)):
|
|
if i > 0:
|
|
previous_caterogy_path = category_path[:i]
|
|
|
|
same_category_count = previous_caterogy_path.count(category_path[i])
|
|
if (same_category_count + 1) % 4 == 0 and same_category_count != 0:
|
|
elim_path[i] = 1
|
|
return elim_path
|
|
|
|
|
|
def find_shortest_path(self, points):
|
|
min_path = None
|
|
min_length = float('inf')
|
|
for perm in itertools.permutations(points):
|
|
perm = np.array(perm)
|
|
|
|
diffs = np.abs(perm[1:] - perm[:-1])
|
|
length = np.sum(diffs)
|
|
if length < min_length:
|
|
min_length = length
|
|
min_path = perm.tolist()
|
|
return min_path, min_length
|
|
|
|
def insert_point(self, path, category_path, elim_path, point, category):
|
|
min_length = float('inf')
|
|
best_position = None
|
|
for i in range(len(path) + 1):
|
|
new_path, new_category_path = path.copy(), category_path.copy()
|
|
new_path.insert(i, point)
|
|
new_category_path.insert(i, category)
|
|
new_elim_path = self.get_elim_path(new_category_path)
|
|
if len(new_path) > 12:
|
|
a=1
|
|
length = self.calculate_length(new_path, new_category_path, new_elim_path)
|
|
if length < min_length:
|
|
min_length = length
|
|
best_position = i
|
|
return best_position
|
|
|
|
def try_single_optimization(self, args):
|
|
"""
|
|
将函数改造为接收单个参数的形式,便于进程池调用
|
|
"""
|
|
path, category_path = args
|
|
path = path.copy()
|
|
category_path = category_path.copy()
|
|
|
|
|
|
index = random.randint(0, len(path) - 1)
|
|
point = path.pop(index)
|
|
category = category_path.pop(index)
|
|
|
|
|
|
elim_path = self.get_elim_path(category_path)
|
|
position = self.insert_point(path, category_path, elim_path, point, category)
|
|
|
|
|
|
path.insert(position, point)
|
|
category_path.insert(position, category)
|
|
|
|
return (path, category_path,
|
|
self.calculate_length(path, category_path, self.get_elim_path(category_path)))
|
|
|
|
def optimize_path_parallel(self, initial_path, initial_category_path, num_iterations=1000):
|
|
"""
|
|
新增的并行优化函数
|
|
"""
|
|
chunk_size = 125
|
|
num_processes = num_iterations // chunk_size
|
|
|
|
|
|
args_list = [(initial_path.copy(), initial_category_path.copy())
|
|
for _ in range(num_iterations)]
|
|
|
|
best_path, best_category_path = initial_path.copy(), initial_category_path.copy()
|
|
best_length = float('inf')
|
|
|
|
|
|
with ProcessPoolExecutor(max_workers=num_processes) as executor:
|
|
|
|
results = list(executor.map(self.try_single_optimization,
|
|
args_list,
|
|
chunksize=chunk_size))
|
|
|
|
|
|
for path, category_path, length in results:
|
|
if length < best_length:
|
|
best_length = length
|
|
best_path = path
|
|
best_category_path = category_path
|
|
|
|
return best_path, best_category_path
|
|
|
|
def arrange_points(self):
|
|
points_by_category = {i: [] for i in random.sample(range(self.num_categories), self.num_categories)}
|
|
for x in range(self.grid_size[0]):
|
|
for y in range(self.grid_size[1]):
|
|
category = self.grid[x, y]
|
|
if category != -1:
|
|
points_by_category[category].append([x, y])
|
|
|
|
path = []
|
|
category_path = []
|
|
for category, points in points_by_category.items():
|
|
while points:
|
|
if len(points) >= 4:
|
|
subset = points[:4]
|
|
points = points[4:]
|
|
else:
|
|
subset = points
|
|
points = []
|
|
|
|
if len(path) == 0:
|
|
path, _ = self.find_shortest_path(subset)
|
|
category_path = [category] * len(path)
|
|
else:
|
|
for point in subset:
|
|
elim_path = self.get_elim_path(category_path)
|
|
position = self.insert_point(path, category_path, elim_path, point, category)
|
|
path.insert(position, point)
|
|
category_path.insert(position, category)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
path, category_path = self.optimize_path_parallel(path, category_path)
|
|
|
|
return path, category_path
|
|
|
|
def plan_action(self):
|
|
actions = []
|
|
for i in range(len(self.path)):
|
|
while self.current_loc[0] != self.path[i][0] or self.current_loc[1] != self.path[i][1]:
|
|
if self.current_loc[0] < self.path[i][0]:
|
|
actions.append(0)
|
|
self.current_loc = [self.current_loc[0] + 1, self.current_loc[1]]
|
|
elif self.current_loc[1] < self.path[i][1]:
|
|
actions.append(1)
|
|
self.current_loc = [self.current_loc[0], self.current_loc[1] + 1]
|
|
elif self.current_loc[0] > self.path[i][0]:
|
|
actions.append(2)
|
|
self.current_loc = [self.current_loc[0] - 1, self.current_loc[1]]
|
|
else:
|
|
actions.append(3)
|
|
self.current_loc = [self.current_loc[0], self.current_loc[1] - 1]
|
|
actions.append(4)
|
|
|
|
return actions
|
|
|
|
def search(grid, loc, pred_grid, pred_loc, num_iterations=30):
|
|
env = GridWorldEnv()
|
|
optim_actions, optim_reward = None, 0
|
|
for i in range(num_iterations):
|
|
env.reset()
|
|
env.grid, env.loc = grid.copy(), loc.copy()
|
|
agent = Algorithm_Agent(env.num_categories, env.grid_size, pred_grid, pred_loc)
|
|
cumulated_reward = 0
|
|
for action in agent.actions:
|
|
obs, reward, done, truncated, info = env.step(action)
|
|
cumulated_reward += reward
|
|
if cumulated_reward > optim_reward:
|
|
optim_actions, optim_reward = agent.actions, cumulated_reward
|
|
print(f'{i}:', cumulated_reward)
|
|
print(f'Optim reward: {optim_reward}')
|
|
return optim_actions
|
|
|
|
|
|
if __name__ == "__main__":
|
|
for _ in range(20):
|
|
test_env = GridWorldEnv()
|
|
test_env.reset()
|
|
grid, loc = test_env.grid.copy(), test_env.loc.copy()
|
|
pred_grid, pred_loc = test_env.grid.copy(), test_env.loc.copy()
|
|
loc_1, loc_2, loc_3, loc_4, loc_5 = random.sample(range(12), 2), random.sample(range(12), 2), random.sample(range(12), 2), random.sample(range(12), 2), random.sample(range(12), 2)
|
|
a, b, c, d, e = pred_grid[loc_1[0], loc_1[1]], pred_grid[loc_2[0], loc_2[1]], pred_grid[loc_3[0], loc_3[1]], pred_grid[loc_4[0], loc_4[1]], pred_grid[loc_5[0], loc_5[1]]
|
|
pred_grid[loc_1[0], loc_1[1]], pred_grid[loc_2[0], loc_2[1]], pred_grid[loc_3[0], loc_3[1]], pred_grid[loc_4[0], loc_4[1]], pred_grid[loc_5[0], loc_5[1]] = b, e, a, c, d
|
|
search(grid, loc, pred_grid, pred_loc)
|
|
|