File size: 9,917 Bytes
a821f69 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 |
import numpy as np
import itertools
import random
from make_env import GridWorldEnv
from concurrent.futures import ProcessPoolExecutor
class Algorithm_Agent():
def __init__(self, num_categories, grid_size, grid, loc):
self.num_categories = num_categories
self.grid_size = grid_size
self.grid = grid
self.current_loc = [loc[0], loc[1]]
self.path, self.path_category = self.arrange_points()
# print('Path generated.')
self.actions = self.plan_action()
# print('Actions generated.')
def calculate_length(self, path, category_path, elim_path):
lengths = np.sum(np.abs(np.array(path[:-1]) - np.array(path[1:])), axis=1)
motion_length = np.sum(lengths) # motion path
cum_lengths = np.cumsum(lengths)[::-1] / 14.4 # cumulative length
load_length = np.sum(cum_lengths) - 4 * np.sum(np.array(cum_lengths) * np.array(elim_path[:-1]))
return motion_length + load_length
def get_elim_path(self, category_path):
elim_path = [0] * len(category_path)
for i in range(len(category_path)):
if i > 0:
previous_caterogy_path = category_path[:i]
# 统计previous_caterogy_path中,与category_path[i]同一类别的元素的个数
same_category_count = previous_caterogy_path.count(category_path[i])
if (same_category_count + 1) % 4 == 0 and same_category_count != 0:
elim_path[i] = 1
return elim_path
def find_shortest_path(self, points):
min_path = None
min_length = float('inf')
for perm in itertools.permutations(points):
perm = np.array(perm) # 转换为numpy数组
# 简化计算方式
diffs = np.abs(perm[1:] - perm[:-1])
length = np.sum(diffs)
if length < min_length:
min_length = length
min_path = perm.tolist()
return min_path, min_length
def insert_point(self, path, category_path, elim_path, point, category):
min_length = float('inf')
best_position = None
for i in range(len(path) + 1):
new_path, new_category_path = path.copy(), category_path.copy()
new_path.insert(i, point)
new_category_path.insert(i, category)
new_elim_path = self.get_elim_path(new_category_path)
if len(new_path) > 12:
a=1
length = self.calculate_length(new_path, new_category_path, new_elim_path)
if length < min_length:
min_length = length
best_position = i
return best_position
def try_single_optimization(self, args):
"""
将函数改造为接收单个参数的形式,便于进程池调用
"""
path, category_path = args
path = path.copy()
category_path = category_path.copy()
# 随机选择一个点
index = random.randint(0, len(path) - 1)
point = path.pop(index)
category = category_path.pop(index)
# 尝试重新插入
elim_path = self.get_elim_path(category_path)
position = self.insert_point(path, category_path, elim_path, point, category)
# 插入到最优位置
path.insert(position, point)
category_path.insert(position, category)
return (path, category_path,
self.calculate_length(path, category_path, self.get_elim_path(category_path)))
def optimize_path_parallel(self, initial_path, initial_category_path, num_iterations=1000):
"""
新增的并行优化函数
"""
chunk_size = 125
num_processes = num_iterations // chunk_size
# 准备参数
args_list = [(initial_path.copy(), initial_category_path.copy())
for _ in range(num_iterations)]
best_path, best_category_path = initial_path.copy(), initial_category_path.copy()
best_length = float('inf')
# 使用进程池
with ProcessPoolExecutor(max_workers=num_processes) as executor:
# 并行执行优化
results = list(executor.map(self.try_single_optimization,
args_list,
chunksize=chunk_size))
# 找出最佳结果
for path, category_path, length in results:
if length < best_length:
best_length = length
best_path = path
best_category_path = category_path
return best_path, best_category_path
def arrange_points(self):
points_by_category = {i: [] for i in random.sample(range(self.num_categories), self.num_categories)} # Group points by category
for x in range(self.grid_size[0]):
for y in range(self.grid_size[1]):
category = self.grid[x, y]
if category != -1:
points_by_category[category].append([x, y]) # Store the position of the item
path = [] # Initialize the path
category_path = []
for category, points in points_by_category.items(): # Process each category
while points: # Process all points in the category
if len(points) >= 4: # If there are at least 4 points, find the shortest path for the first 4 points
subset = points[:4]
points = points[4:]
else:
subset = points
points = []
if len(path) == 0: # If the path has only the loc, find the shortest path for the subset
path, _ = self.find_shortest_path(subset)
category_path = [category] * len(path)
else:
for point in subset:
elim_path = self.get_elim_path(category_path)
position = self.insert_point(path, category_path, elim_path, point, category)
path.insert(position, point)
category_path.insert(position, category)
# print(f'category: {category}, category_path: {category_path}\n')
# # 排列好第一轮后,再次调整顺序
# # 从序列中随机剔除一个元素,然后插入到其他位置,使得路径长度最短
# for i in range(1000):
# index = random.randint(0, len(path) - 1)
# point = path.pop(index)
# category = category_path.pop(index)
# elim_path = self.get_elim_path(category_path)
# position = self.insert_point(path, category_path, elim_path, point, category)
# path.insert(position, point)
# category_path.insert(position, category)
# 使用并行优化替换原来的循环
path, category_path = self.optimize_path_parallel(path, category_path)
return path, category_path
def plan_action(self):
actions = []
for i in range(len(self.path)):
while self.current_loc[0] != self.path[i][0] or self.current_loc[1] != self.path[i][1]:
if self.current_loc[0] < self.path[i][0]:
actions.append(0)
self.current_loc = [self.current_loc[0] + 1, self.current_loc[1]]
elif self.current_loc[1] < self.path[i][1]:
actions.append(1)
self.current_loc = [self.current_loc[0], self.current_loc[1] + 1]
elif self.current_loc[0] > self.path[i][0]:
actions.append(2)
self.current_loc = [self.current_loc[0] - 1, self.current_loc[1]]
else:
actions.append(3)
self.current_loc = [self.current_loc[0], self.current_loc[1] - 1]
actions.append(4)
# print(f'actions: {actions}\n')
return actions
def search(grid, loc, pred_grid, pred_loc, num_iterations=30):
env = GridWorldEnv()
optim_actions, optim_reward = None, 0
for i in range(num_iterations):
env.reset()
env.grid, env.loc = grid.copy(), loc.copy()
agent = Algorithm_Agent(env.num_categories, env.grid_size, pred_grid, pred_loc)
cumulated_reward = 0
for action in agent.actions:
obs, reward, done, truncated, info = env.step(action)
cumulated_reward += reward
if cumulated_reward > optim_reward:
optim_actions, optim_reward = agent.actions, cumulated_reward
print(f'{i}:', cumulated_reward)
print(f'Optim reward: {optim_reward}')
return optim_actions
if __name__ == "__main__":
for _ in range(20):
test_env = GridWorldEnv()
test_env.reset()
grid, loc = test_env.grid.copy(), test_env.loc.copy()
pred_grid, pred_loc = test_env.grid.copy(), test_env.loc.copy()
loc_1, loc_2, loc_3, loc_4, loc_5 = random.sample(range(12), 2), random.sample(range(12), 2), random.sample(range(12), 2), random.sample(range(12), 2), random.sample(range(12), 2)
a, b, c, d, e = pred_grid[loc_1[0], loc_1[1]], pred_grid[loc_2[0], loc_2[1]], pred_grid[loc_3[0], loc_3[1]], pred_grid[loc_4[0], loc_4[1]], pred_grid[loc_5[0], loc_5[1]]
pred_grid[loc_1[0], loc_1[1]], pred_grid[loc_2[0], loc_2[1]], pred_grid[loc_3[0], loc_3[1]], pred_grid[loc_4[0], loc_4[1]], pred_grid[loc_5[0], loc_5[1]] = b, e, a, c, d
search(grid, loc, pred_grid, pred_loc) # 使用5格混淆的grid进行搜索
|