grg's picture
Cleaned old git history
be5548b
from gym_minigrid.minigrid import *
from gym_minigrid.register import register
import time
from collections import deque
class Thief(NPC):
"""
A dancing NPC that the agent has to copy
NPC executes a sequence of movement and utterances
"""
def __init__(self, color, name, env, thief_pos, hidden_npc=False, tag_visible_coins=False, view_size=5, look_around=True):
super().__init__(color)
self.name = name
self.npc_type = 0
self.env = env
self.cur_pos = thief_pos
self.init_pos = thief_pos
self.view_size = view_size
self.npc_dir = self._look_at_agent() # Thief stares at its victim
self.init_dir = self.npc_dir
self.actions = self.env.possible_actions
self.tag_visible_coins = tag_visible_coins
self.nb_given_coins = None
self.look_around = look_around
if self.look_around: # randomly select in which direction NPC is looking around
if np.random.random() > 0.5: # will look left
self.look = self.rotate_left
self.look_back = self.rotate_right
else: # will look right
self.look = self.rotate_right
self.look_back = self.rotate_left
self.nb_seen_coins = self._count_coins() # This is how much coins Thief wants, at least
self.add_npc_direction = True
self.nb_steps = 0
self.hidden_npc = hidden_npc
def step(self, agent_action, agent_utterance):
agent_disobeyed = False
agent_gave_coins = False
utterance = None
if self.nb_steps == 0:
utterance = "Freeze! Give me all the coins you have!"
if self.nb_steps >= 0 and self.look_around:
if self.npc_dir == self.init_dir: # start to look around
self.look()
else: # resume looking to agent
self.look_back()
if not(agent_utterance is None):
self.nb_given_coins = int(agent_utterance[-2])
if self.nb_given_coins >= self.nb_seen_coins:
agent_gave_coins = True
else: # agent gave not enough coins
agent_disobeyed = True
# The thief forbids the agent to move, apart from looking around (rotating)
if not (np.isnan(agent_action) or agent_action == self.actions.left or agent_action == self.actions.right):
agent_disobeyed = True
self.nb_steps += 1
return agent_disobeyed, agent_gave_coins, utterance
def get_status_str(self):
return "thief sees: {} \n agent gives: {}".format(self.nb_seen_coins, self.nb_given_coins)
def _count_coins(self):
# get seen coins
coins_pos = self.get_pos_visible_coins()
if self.look_around:
self.look()
# add coins visible from this new direction
coins_pos += self.get_pos_visible_coins()
# remove coins that we already saw
if len(coins_pos) > 0:
coins_pos = np.unique(coins_pos, axis=0).tolist()
self.look_back()
return len(coins_pos)
def _look_at_agent(self):
npc_dir = None
ax, ay = self.env.agent_pos
tx, ty = self.cur_pos
delta_x, delta_y = ax - tx, ay - ty
if delta_x == 1:
npc_dir = 0
elif delta_x == -1:
npc_dir = 2
elif delta_y == 1:
npc_dir = 1
elif delta_y == -1:
npc_dir = 3
else:
raise NotImplementedError
return npc_dir
def gen_npc_obs_grid(self):
"""
Generate the sub-grid observed by the npc.
This method also outputs a visibility mask telling us which grid
cells the npc can actually see.
"""
view_size = self.view_size
topX, topY, botX, botY = self.env.get_view_exts(dir=self.npc_dir, view_size=view_size, pos=self.cur_pos)
grid = self.env.grid.slice(topX, topY, view_size, view_size)
for i in range(self.npc_dir + 1):
grid = grid.rotate_left()
# Process occluders and visibility
# Note that this incurs some performance cost
if not self.env.see_through_walls:
vis_mask = grid.process_vis(agent_pos=(view_size // 2, view_size - 1))
else:
vis_mask = np.ones(shape=(grid.width, grid.height), dtype=np.bool)
# Make it so the agent sees what it's carrying
# We do this by placing the carried object at the agent's position
# in the agent's partially observable view
# agent_pos = grid.width // 2, grid.height - 1
# if self.carrying:
# grid.set(*agent_pos, self.carrying)
# else:
# grid.set(*agent_pos, None)
return grid, vis_mask
def get_pos_visible_coins(self):
"""
Generate the npc's view (partially observable, low-resolution encoding)
return the list of unique visible coins
"""
grid, vis_mask = self.gen_npc_obs_grid()
coins_pos = []
for obj in grid.grid:
if isinstance(obj, Ball):
coins_pos.append(obj.cur_pos)
if self.tag_visible_coins:
obj.tag()
return coins_pos
def can_overlap(self):
# If the NPC is hidden, agent can overlap on it
return self.hidden_npc
class CoinThiefGrammar(object):
templates = ["Here is"]
things = ["0","1","2","3","4","5","6"]
grammar_action_space = spaces.MultiDiscrete([len(templates), len(things)])
@classmethod
def construct_utterance(cls, action):
return cls.templates[int(action[0])] + " " + cls.things[int(action[1])] + " "
@classmethod
def random_utterance(cls):
return np.random.choice(cls.templates) + " " + np.random.choice(cls.things) + " "
class ThiefActions(IntEnum):
# Turn left, turn right, move forward
left = 0
right = 1
forward = 2
class CoinThiefEnv(MultiModalMiniGridEnv):
"""
Environment in which the agent is instructed to go to a given object
named using an English text string
"""
def __init__(
self,
size=5,
hear_yourself=False,
diminished_reward=True,
step_penalty=False,
hidden_npc=False,
max_steps=20,
full_obs=False,
few_actions=False,
tag_visible_coins=False,
nb_coins=6,
npc_view_size=5,
npc_look_around=True
):
assert size >= 5
self.empty_symbol = "NA \n"
self.hear_yourself = hear_yourself
self.diminished_reward = diminished_reward
self.step_penalty = step_penalty
self.hidden_npc = hidden_npc
self.few_actions = few_actions
self.possible_actions = ThiefActions if self.few_actions else MiniGridEnv.Actions
self.nb_coins = nb_coins
self.tag_visible_coins = tag_visible_coins
self.npc_view_size = npc_view_size
self.npc_look_around = npc_look_around
if max_steps is None:
max_steps = 5*size**2
super().__init__(
grid_size=size,
max_steps=max_steps,
# Set this to True for maximum speed
see_through_walls=True,
full_obs=full_obs,
actions=MiniGridEnv.Actions,
action_space=spaces.MultiDiscrete([
len(self.possible_actions),
*CoinThiefGrammar.grammar_action_space.nvec
]),
add_npc_direction=True
)
print({
"size": size,
"hear_yourself": hear_yourself,
"diminished_reward": diminished_reward,
"step_penalty": step_penalty,
})
def _gen_grid(self, width, height):
# Create the grid
self.grid = Grid(width, height, nb_obj_dims=4)
# Randomly vary the room width and height
# width = self._rand_int(5, width+1)
# height = self._rand_int(5, height+1)
# Generate the surrounding walls
self.grid.wall_rect(0, 0, width, height)
# Generate the surrounding walls
self.grid.wall_rect(0, 0, width, height)
# Randomize the agent's start position and orientation
self.place_agent(size=(width, height))
#Β Get possible near-agent positions, and place thief in one of them
ax, ay = self.agent_pos
near_agent_pos = [[ax, ay + 1], [ax, ay - 1], [ax - 1, ay], [ax + 1, ay]]
# get empty cells positions
available_pos = []
for p in near_agent_pos:
if self.grid.get(*p) is None:
available_pos.append(p)
thief_pos = self._rand_elem(available_pos)
# Add randomly placed coins
# Types and colors of objects we can generate
types = ['ball']
objs = []
objPos = []
# Until we have generated all the objects
while len(objs) < self.nb_coins:
objType = self._rand_elem(types)
objColor = 'yellow'
if objType == 'ball':
obj = Ball(objColor)
else:
raise NotImplementedError
pos = self.place_obj(obj, reject_fn=lambda env,pos: pos.tolist() == thief_pos)
objs.append((objType, objColor))
objPos.append(pos)
# Set a randomly coloured Thief NPC next to the agent
color = self._rand_elem(COLOR_NAMES)
self.thief = Thief(color, "Eve", self, thief_pos,
hidden_npc=self.hidden_npc,
tag_visible_coins=self.tag_visible_coins,
view_size=self.npc_view_size,
look_around=self.npc_look_around)
self.grid.set(*thief_pos, self.thief)
# Generate the mission string
self.mission = 'save as much coins as possible'
# Dummy beginning string
self.beginning_string = "This is what you hear. \n"
self.utterance = self.beginning_string
# utterance appended at the end of each step
self.utterance_history = ""
# used for rendering
self.conversation = self.utterance
self.outcome_info = None
def step(self, action):
p_action = action[0] if np.isnan(action[0]) else int(action[0])
if len(action) == 1: # agent cannot speak
utterance_action = [np.nan, np.nan]
else:
utterance_action = action[1:]
obs, reward, done, info = super().step(p_action)
# assert all nan or neither nan
assert len(set(np.isnan(utterance_action))) == 1
speak_flag = not all(np.isnan(utterance_action))
if speak_flag:
utterance = CoinThiefGrammar.construct_utterance(utterance_action)
self.conversation += "{}: {} \n".format("Agent", utterance)
# Don't let the agent open any doors
if not self.few_actions and p_action == self.actions.toggle:
done = True
if not self.few_actions and p_action == self.actions.done:
done = True
# npc's turn
agent_disobeyed, agent_gave_coins, npc_utterance = self.thief.step(p_action, utterance if speak_flag else None)
if self.hidden_npc:
npc_utterance = None
if npc_utterance:
self.utterance += "{} \n".format(npc_utterance)
self.conversation += "{}: {} \n".format(self.thief.name, npc_utterance)
if agent_disobeyed:
done = True
if agent_gave_coins:
done = True
if self.thief.nb_seen_coins == self.thief.nb_given_coins:
reward = self._reward()
self.outcome_info = "SUCCESS: agent got {} reward \n".format(np.round(reward,1))
if done and reward == 0:
self.outcome_info = "FAILURE: agent got {} reward \n".format(reward)
# discount
if self.step_penalty:
reward = reward - 0.01
if self.hidden_npc:
# remove npc from agent view
npc_obs_idx = np.argwhere(obs['image'] == 11)
if npc_obs_idx.size != 0: # agent sees npc
obs['image'][npc_obs_idx[0][0], npc_obs_idx[0][1], :] = [1, 0, 0, 0]
# fill observation with text
self.append_existing_utterance_to_history()
obs = self.add_utterance_to_observation(obs)
self.reset_utterance()
return obs, reward, done, info
def _reward(self):
if self.diminished_reward:
return super()._reward()
else:
return 1.0
def render(self, *args, **kwargs):
obs = super().render(*args, **kwargs)
print("conversation:\n", self.conversation)
print("utterance_history:\n", self.utterance_history)
self.window.clear_text() # erase previous text
self.window.set_caption(self.conversation) # overwrites super class caption
self.window.ax.set_title(self.thief.get_status_str(), loc="left")
if self.outcome_info:
color = None
if "SUCCESS" in self.outcome_info:
color = "lime"
elif "FAILURE" in self.outcome_info:
color = "red"
self.window.add_text(*(0.01, 0.85, self.outcome_info),
**{'fontsize':15, 'color':color, 'weight':"bold"})
self.window.show_img(obs) # re-draw image to add changes to window
return obs
class CoinThief8x8Env(CoinThiefEnv):
def __init__(self, **kwargs):
super().__init__(size=8, **kwargs)
class CoinThief6x6Env(CoinThiefEnv):
def __init__(self, **kwargs):
super().__init__(size=6, **kwargs)
register(
id='MiniGrid-CoinThief-5x5-v0',
entry_point='gym_minigrid.envs:CoinThiefEnv'
)
register(
id='MiniGrid-CoinThief-6x6-v0',
entry_point='gym_minigrid.envs:CoinThief6x6Env'
)
register(
id='MiniGrid-CoinThief-8x8-v0',
entry_point='gym_minigrid.envs:CoinThief8x8Env'
)