grg's picture
Cleaned old git history
be5548b
raw
history blame
20.8 kB
import time
import random
import numpy as np
from gym_minigrid.minigrid import *
from gym_minigrid.register import register
from gym_minigrid.social_ai_envs.socialaigrammar import SocialAIGrammar, SocialAIActions, SocialAIActionSpace
import time
from collections import deque
class Partner(NPC):
"""
A simple NPC that knows who is telling the truth
"""
def __init__(self, color, name, env):
super().__init__(color)
self.name = name
self.env = env
self.npc_dir = 1 # NPC initially looks downward
self.npc_type = 0 # this will be put into the encoding
# opposite role of the agent
self.npc_side = "L" if self.env.agent_side == "R" else "R"
# how many random action at the beginning -> removes trivial solutions
self.random_to_go = random.randint(self.env.lever_active_steps, 10)
assert set([self.npc_side, self.env.agent_side]) == {"L", "R"}
self.was_introduced_to = False
self.ate_an_apple = False
self.pressed_the_lever = False
self.pushed_the_generator = False
self.toggling = False
# target obj
assert self.env.problem == self.env.parameters["Problem"] if self.env.parameters else "MarblePass"
self.target_obj = self.env.generator
assert self.env.grammar.contains_utterance(self.introduction_statement)
def step(self, utterance):
reply, info = super().step()
if self.env.hidden_npc:
return reply, info
reply, action = self.handle_introduction(utterance)
if self.was_introduced_to:
if self.random_to_go > 0:
action = random.choice([self.go_forward, self.rotate_left, self.rotate_right])
self.random_to_go -= 1
elif self.npc_side == "L":
if not self.pressed_the_lever:
# is the NPC next to the lever
if np.abs(self.env.lever.cur_pos - self.cur_pos).sum() > 1:
# go to the lever
action = self.path_to_pos(self.env.lever.cur_pos)
else:
# look at the agent
wanted_dir = self.compute_wanted_dir(self.env.agent_pos)
if wanted_dir != self.npc_dir and not self.toggling:
# turn to look at the agent
action = self.compute_turn_action(wanted_dir)
else:
# check if the agent is next to the door
if np.abs(self.env.door.cur_pos - self.env.agent_pos).sum() <= 1:
self.toggling = True
action = self.path_to_toggle_pos(self.env.lever.cur_pos)
elif self.npc_side == "R":
if not self.pushed_the_generator:
# go to generator and push it
action = self.path_to_pos(self.env.generator.cur_pos)
if action is None:
# the door is not open, no paths exist
action = self.path_to_pos(self.env.door.cur_pos)
else:
raise ValueError("Undefined role")
eaten_before = self.env.partner_apple.eaten
lever_on_before = self.env.lever.is_on
generator_pushed_before = self.env.generator.is_pressed
if action is not None:
action()
# check if the NPC ate the apple
if not self.ate_an_apple:
self.ate_an_apple = not eaten_before and self.env.partner_apple.eaten
if not self.pressed_the_lever:
self.pressed_the_lever = not lever_on_before and self.env.lever.is_on
if not self.pushed_the_generator:
self.pushed_the_generator = not generator_pushed_before and self.env.generator.is_pressed
info = {
"prim_action": action.__name__ if action is not None else "no_op",
"utterance": reply or "no_op",
"was_introduced_to": self.was_introduced_to
}
assert (reply or "no_op") in self.list_of_possible_utterances
return reply, info
class LeverDoorEnv(MultiModalMiniGridEnv):
"""
Environment in which the agent is instructed to go to a given object
named using an English text string
"""
def __init__(
self,
size=10,
diminished_reward=True,
step_penalty=False,
knowledgeable=False,
max_steps=80,
hidden_npc=False,
lever_active_steps=10,
reward_diminish_factor=0.1,
egocentric_observation=True,
):
assert size >= 5
self.empty_symbol = "NA \n"
self.diminished_reward = diminished_reward
self.step_penalty = step_penalty
self.knowledgeable = knowledgeable
self.hidden_npc = hidden_npc
self.hear_yourself = False
self.grammar = SocialAIGrammar()
self.init_done = False
# parameters - to be set in reset
self.parameters = None
# encoding size should be 5
self.add_npc_direction = True
self.add_npc_point_direction = True
self.add_npc_last_prim_action = True
self.lever_active_steps = lever_active_steps
self.reward_diminish_factor = reward_diminish_factor
self.egocentric_observation = egocentric_observation
self.encoding_size = 3 + 2*bool(not self.egocentric_observation) + bool(self.add_npc_direction) + bool(self.add_npc_point_direction) + bool(self.add_npc_last_prim_action)
super().__init__(
grid_size=size,
max_steps=max_steps,
# Set this to True for maximum speed
see_through_walls=False,
actions=SocialAIActions, # primitive actions
action_space=SocialAIActionSpace,
add_npc_direction=self.add_npc_direction,
add_npc_point_direction=self.add_npc_point_direction,
add_npc_last_prim_action=self.add_npc_last_prim_action,
reward_diminish_factor=self.reward_diminish_factor,
)
self.all_npc_utterance_actions = Partner.get_list_of_possible_utterances()
self.prim_actions_dict = SocialAINPCActionsDict
def _gen_grid(self, width_, height_):
# Create the grid
self.grid = Grid(width_, height_, nb_obj_dims=self.encoding_size)
min_w = min(9, width_)
min_h = min(9, height_)
self.current_width = self._rand_int(min_w, width_+1)
self.current_height = self._rand_int(min_h, height_+1)
# print("Room size: {}x{}".format(self.current_width, self.current_height))
# previous
# self.current_width = self._rand_int(6, width_+1)
# self.current_height = self._rand_int(6, height_+1)
# original
# self.current_width = self._rand_int(5, width_+1)
# self.current_height = self._rand_int(5, height_+1)
# self.current_width = width_
# self.current_height = height_
# self.current_width = 8
# self.current_height = 8
# warnings.warn("env size fixed: {}x{}".format(self.current_width, self.current_height))
self.wall_x = self.current_width-1
self.wall_y = self.current_height-1
# Generate the surrounding walls
self.grid.wall_rect(0, 0, self.current_width, self.current_height)
self.problem = self.parameters["Problem"] if self.parameters else "LeverDoor"
self.version = self.parameters["Version"] if self.parameters else "Asocial"
self.role = self.parameters["Role"] if self.parameters else "A"
assert self.role in ["A", "B", "Meta"]
if self.role in ["B", "Meta"]:
self.agent_side = "R" # starts on the right side
else:
self.agent_side = "L" # starts on the right side
num_of_colors = self.parameters.get("Num_of_colors", None) if self.parameters else None
self.add_obstacles()
# apple
if num_of_colors is None:
POSSIBLE_COLORS = COLOR_NAMES
else:
POSSIBLE_COLORS = COLOR_NAMES[:int(num_of_colors)]
self.left_half_size = (self.current_width//2, self.current_height)
self.left_half_top = (0, 0)
self.right_half_size = (self.current_width//2, self.current_height)
self.right_half_top = (self.current_width - self.current_width // 2, 0)
# generator
self.generator_pos = (self.current_width//2, self.current_height)
self.generator_color = self._rand_elem(POSSIBLE_COLORS)
self.generator_current_pos = self.find_loc(
# on the right wall
top=(self.current_width-1, 1),
size=(1, self.current_height-2),
reject_agent_pos=True,
reject_taken_pos=False, # so that it can be placed on the wall
)
# create hole in the wall for the generator
assert type(self.grid.get(*self.generator_current_pos)) == Wall
self.grid.set(*self.generator_current_pos, None)
# add fence to grid
self.grid.vert_wall(
x=self.current_width//2,
y=1,
length=self.current_height - 2,
obj_type=Fence
)
# door in front of generator
self.door_current_pos = self.generator_current_pos - np.array([1, 0])
self.door_color = self._rand_elem(POSSIBLE_COLORS)
# lever
self.lever_current_pos = self.find_loc(
top=self.left_half_top, size=self.left_half_size, reject_agent_pos=True,
reject_fn=lambda _, pos: tuple(pos) in map(tuple, [
self.door_current_pos])
)
self.lever_color = self._rand_elem(POSSIBLE_COLORS)
# generator platform
# find the position for generator_platforms
self.left_apple_current_pos = self.find_loc(
top=self.left_half_top, size=self.left_half_size, reject_agent_pos=True,
reject_fn=lambda _, pos: tuple(pos) in map(tuple, [
self.generator_current_pos, self.door_current_pos, self.lever_current_pos])
)
self.right_apple_current_pos = self.find_loc(
top=self.right_half_top, size=self.right_half_size, reject_agent_pos=True,
reject_fn=lambda _, pos: tuple(pos) in map(tuple, [
self.generator_current_pos, self.door_current_pos, self.lever_current_pos])
)
assert all(self.left_apple_current_pos < np.array([self.current_width - 1, self.current_height - 1]))
assert all(self.right_apple_current_pos < np.array([self.current_width - 1, self.current_height - 1]))
self.agent_generator_platform_color = self._rand_elem(POSSIBLE_COLORS)
self.partner_generator_platform_color = self._rand_elem(POSSIBLE_COLORS)
self.put_objects_in_env()
# agent
if self.agent_side == "L":
self.place_agent(size=self.left_half_size, top=self.left_half_top)
else:
self.place_agent(size=self.right_half_size, top=self.right_half_top)
if self.version == "Social":
# NPC
self.npc_color = self._rand_elem(COLOR_NAMES)
self.caretaker = Partner(self.npc_color, "Partner", self)
if self.agent_side == "L":
self.place_obj(self.caretaker, size=self.right_half_size, top=self.right_half_top)
else:
self.place_obj(self.caretaker, size=self.left_half_size, top=self.left_half_top)
# Generate the mission string
self.mission = 'lets collaborate'
# Dummy beginning string
# self.beginning_string = "This is what you hear. \n"
self.beginning_string = "Conversation: \n"
self.utterance = self.beginning_string
# utterance appended at the end of each step
self.utterance_history = ""
# used for rendering
self.full_conversation = self.utterance
self.outcome_info = None
def put_objects_in_env(self, remove_objects=False):
assert self.left_apple_current_pos is not None
assert self.right_apple_current_pos is not None
assert self.generator_current_pos is not None
assert self.agent_generator_platform_color is not None
assert self.partner_generator_platform_color is not None
assert self.problem == self.parameters["Problem"] if self.parameters else "MarblePass"
if remove_objects:
self.grid.set(*self.agent_generator_platform.cur_pos, None) # remove apple
self.grid.set(*self.partner_generator_platform.cur_pos, None) # remove apple
self.grid.set(*self.generator.cur_pos, None) # remove generator
self.grid.set(*self.door.cur_pos, None) # remove door
self.grid.set(*self.lever.cur_pos, None) # remove lever
# apple
self.agent_apple = Apple()
self.partner_apple = Apple()
def generate_apples():
if self.agent_side == "L":
self.grid.set(*self.left_apple_current_pos, self.agent_apple),
self.grid.set(*self.right_apple_current_pos, self.partner_apple),
else:
self.grid.set(*self.left_apple_current_pos, self.partner_apple),
self.grid.set(*self.right_apple_current_pos, self.agent_apple),
# Generator
self.generator = AppleGenerator(
self.generator_color,
on_push=generate_apples,
)
door_open = self.version == "Asocial"
self.door = RemoteDoor(color=self.door_color, is_open=door_open)
self.lever = Lever(color=self.lever_color, object=self.door, active_steps=self.lever_active_steps)
self.agent_generator_platform = GeneratorPlatform(self.agent_generator_platform_color)
self.partner_generator_platform = GeneratorPlatform(self.partner_generator_platform_color)
self.put_obj_np(self.agent_generator_platform, self.left_apple_current_pos)
self.put_obj_np(self.partner_generator_platform, self.right_apple_current_pos)
self.put_obj_np(self.generator, self.generator_current_pos)
self.put_obj_np(self.door, self.door_current_pos)
self.put_obj_np(self.lever, self.lever_current_pos)
def reset(
self, *args, **kwargs
):
# This env must be used inside the parametric env
if not kwargs:
# The only place when kwargs can empty is during the class construction
# reset should be called again before using the env (paramenv does it in its constructor)
assert self.parameters is None
assert not self.init_done
self.init_done = True
obs = super().reset()
return obs
else:
assert self.init_done
self.parameters = dict(kwargs)
assert self.parameters is not None
assert len(self.parameters) > 0
obs = super().reset()
self.agent_ate_the_apple = False
self.agent_opened_the_box = False
self.agent_turned_on_the_switch = False
self.agent_pressed_the_generator = False
return obs
def step(self, action):
success = False
p_action = action[0]
utterance_action = action[1:]
apple_had_been_eaten = self.agent_apple.eaten
generator_had_been_pressed = self.generator.is_pressed
# primitive actions
_, reward, done, info = super().step(p_action)
self.lever.step()
# eaten just now by primitive actions of the agent
if not self.agent_ate_the_apple:
self.agent_ate_the_apple = self.agent_apple.eaten and not apple_had_been_eaten
if not self.agent_pressed_the_generator:
self.agent_pressed_the_generator = self.generator.is_pressed and not generator_had_been_pressed
# utterances
agent_spoke = not all(np.isnan(utterance_action))
if agent_spoke:
utterance = self.grammar.construct_utterance(utterance_action)
if self.hear_yourself:
self.utterance += "YOU: {} \n".format(utterance)
self.full_conversation += "YOU: {} \n".format(utterance)
else:
utterance = None
if self.version == "Social":
reply, npc_info = self.caretaker.step(utterance)
if reply:
self.utterance += "{}: {} \n".format(self.caretaker.name, reply)
self.full_conversation += "{}: {} \n".format(self.caretaker.name, reply)
else:
npc_info = {
"prim_action": "no_op",
"utterance": "no_op",
"was_introduced_to": False,
}
# aftermath
if p_action == self.actions.done:
done = True
elif self.agent_ate_the_apple:
# check that it is the agent who ate it
assert self.actions(p_action) == self.actions.toggle
assert self.get_cell(*self.front_pos) == self.agent_apple
if self.version == "Asocial" or self.role in ["A", "B"]:
reward = self._reward()
success = True
done = True
elif self.role == "Meta":
if self.agent_side == "L":
reward = self._reward() / 2
success = True
done = True
elif self.agent_side == "R":
reward = self._reward() / 2
self.agent_ate_the_apple=False
self.agent_side = "L"
self.put_objects_in_env(remove_objects=True)
# teleport the agent and the NPC
self.place_agent(size=self.left_half_size, top=self.left_half_top)
self.grid.set(*self.caretaker.cur_pos, None)
self.caretaker = Partner(self.npc_color, "Partner", self)
self.place_obj(self.caretaker, size=self.right_half_size, top=self.right_half_top)
else:
raise ValueError(f"Side unknown - {self.agent_side}.")
else:
raise ValueError(f"Role unknown - {self.role}.")
# discount
if self.step_penalty:
reward = reward - 0.01
# update obs with NPC movement
obs = self.gen_obs(full_obs=self.full_obs)
# fill observation with text
self.append_existing_utterance_to_history()
obs = self.add_utterance_to_observation(obs)
self.reset_utterance()
if done:
if reward > 0:
self.outcome_info = "SUCCESS: agent got {} reward \n".format(np.round(reward, 1))
else:
self.outcome_info = "FAILURE: agent got {} reward \n".format(reward)
if self.version == "Social":
# is the npc seen by the agent
ag_view_npc = self.relative_coords(*self.caretaker.cur_pos)
if ag_view_npc is not None:
# in the agent's field of view
ag_view_npc_x, ag_view_npc_y = ag_view_npc
n_dims = obs['image'].shape[-1]
npc_encoding = self.caretaker.encode(n_dims)
# is it occluded
npc_observed = all(obs['image'][ag_view_npc_x, ag_view_npc_y] == npc_encoding)
else:
npc_observed = False
else:
npc_observed = False
info = {**info, **{"NPC_"+k: v for k, v in npc_info.items()}}
info["NPC_observed"] = npc_observed
info["success"] = success
return obs, reward, done, info
def _reward(self):
if self.diminished_reward:
return super()._reward()
else:
return 1.0
# def render(self, *args, **kwargs):
# obs = super().render(*args, **kwargs)
# self.window.clear_text() # erase previous text
# self.window.set_caption(self.full_conversation)
#
# # self.window.ax.set_title("correct color: {}".format(self.box.target_color), loc="left", fontsize=10)
#
# if self.outcome_info:
# color = None
# if "SUCCESS" in self.outcome_info:
# color = "lime"
# elif "FAILURE" in self.outcome_info:
# color = "red"
# self.window.add_text(*(0.01, 0.85, self.outcome_info),
# **{'fontsize':15, 'color':color, 'weight':"bold"})
#
# self.window.show_img(obs) # re-draw image to add changes to window
# return obs
register(
id='SocialAI-LeverDoorEnv-v1',
entry_point='gym_minigrid.social_ai_envs:LeverDoorEnv'
)