I am trying to learn the environment I created for a reinforcement learning project with two different classical RL algorithms, Q-learning and Dyna-q. I managed to do it with Q-learning, but when I do it with Dyna-q, it hangs in a loop and I couldn't find why. I shared the necessary environment and agent codes. How can I get out of this loop or make it more effective?

```
def row_col_to_seq(row_col, dim_y):
return row_col[:,0] * dim_y + row_col[:,1]
def seq_to_col_row(seq, dim_y):
r = floor(seq / dim_y)
c = seq - r * dim_y
return np.array([[r, c]])
```

**env codes:**

```
class GridWorld:
"""
Creates a gridworld object to pass to an RL algorithm.
Parameters
----------
num_rows : int
The number of rows in the gridworld.
num_cols : int
The number of cols in the gridworld.
start_state : numpy array of shape (1, 2), np.array([[row, col]])
The start state of the gridworld (can only be one start state)
goal_states : numpy arrany of shape (n, 2)
The goal states for the gridworld where n is the number of goal
states.
"""
def __init__(self, num_rows, num_cols, start_state, goal_states):
self.num_rows = num_rows
self.num_cols = num_cols
self.start_state = start_state
self.goal_states = goal_states
self.obs_states = None
self.bad_states = None
self.num_bad_states = 0
self.p_good_trans = None
self.bias = None
self.r_step = None
self.r_goal = None
self.r_dead = None
self.gamma = 1 # default is no discounting
self.maxSteps = float('inf')
self.changingPoint = None
self.num_actions = 4
self.actions = [0, 1, 2, 3]
self.stateActionValues = np.zeros((self.num_rows, self.num_cols, self.num_actions))
self.resolution = 1
def add_obstructions(self, obstructed_states=None, bad_states=None, restart_states=None):
"""
Add obstructions to the grid world.
Obstructed states: walls that prohibit the agent from entering that state.
Bad states: states that incur a greater penalty than a normal step.
Restart states: states that incur a high penalty and transition the agent
back to the start state (but do not end the episode).
Parameters
----------
obstructed_states : numpy array of shape (n, 2)
States the agent cannot enter where n is the number of obstructed states
and the two columns are the row and col position of the obstructed state.
bad_states: numpy array of shape (n, 2)
States in which the agent incurs high penalty where n is the number of bad
states and the two columns are the row and col position of the bad state.
restart_states: numpy array of shape (n, 2)
States in which the agent incurs high penalty and transitions to the start
state where n is the number of restart states and the two columns are the
row and col position of the restart state.
"""
self.obs_states = obstructed_states
self.bad_states = bad_states
if bad_states is not None:
self.num_bad_states = bad_states.shape[0]
else:
self.num_bad_states = 0
self.restart_states = restart_states
if restart_states is not None:
self.num_restart_states = restart_states.shape[0]
else:
self.num_restart_states = 0
def add_transition_probability(self, p_good_transition, bias):
"""
Add transition probabilities to the grid world.
p_good_transition is the probability that the agent successfully
executes the intended action. The action is then incorrectly executed
with probability 1 - p_good_transition and in this case the agent
transitions to the left of the intended transition with probability
(1 - p_good_transition) * bias and to the right with probability
(1 - p_good_transition) * (1 - bias).
Parameters
----------
p_good_transition : float (in the interval [0,1])
The probability that the agents attempted transition is successful.
bias : float (in the interval [0,1])
The probability that the agent transitions left or right of the
intended transition if the intended transition is not successful.
"""
self.p_good_trans = p_good_transition
self.bias = bias
def add_rewards(self, step_reward, goal_reward, bad_state_reward=None, restart_state_reward = None):
"""
Define which states incur which rewards.
Parameters
----------
step_reward : float
The reward for each step taken by the agent in the grid world.
Typically a negative value (e.g. -1).
goal_reward : float
The reward given to the agent for reaching the goal state.
Typically a middle range positive value (e.g. 10)
bad_state_reward : float
The reward given to the agent for transitioning to a bad state.
Typically a middle range negative value (e.g. -6)
restart_state_reward : float
The reward given to the agent for transitioning to a restart state.
Typically a large negative value (e.g. -100)
"""
self.r_step = step_reward
self.r_goal = goal_reward
self.r_bad = bad_state_reward
self.r_restart = restart_state_reward
def add_discount(self, discount):
"""
Discount rewards so that recent rewards carry more weight than past rewards.
Parameters
----------
discount : float (in the interval [0, 1])
The discount factor.
"""
self.gamma = discount
def create_gridworld(self):
"""
Create the grid world with the specified parameters.
Returns
-------
self : class object
Holds information about the environment to solve
such as the reward structure and the transition dynamics.
"""
self.num_actions = 4
self.num_states = self.num_cols * self.num_rows + 1
self.start_state_seq = row_col_to_seq(self.start_state, self.num_cols)
self.goal_states_seq = row_col_to_seq(self.goal_states, self.num_cols)
# rewards structure
self.R = self.r_step * np.ones((self.num_states, 1))
self.R[self.num_states-1] = 0
self.R[self.goal_states_seq] = self.r_goal
for i in range(self.num_bad_states):
if self.r_bad is None:
raise Exception("Bad state specified but no reward is given")
bad_state = row_col_to_seq(self.bad_states[i,:].reshape(1,-1), self.num_cols)
self.R[bad_state, :] = self.r_bad
for i in range(self.num_restart_states):
if self.r_restart is None:
raise Exception("Restart state specified but no reward is given")
restart_state = row_col_to_seq(self.restart_states[i,:].reshape(1,-1), self.num_cols)
self.R[restart_state, :] = self.r_restart
# probability model
if self.p_good_trans == None:
raise Exception("Must assign probability and bias terms via the add_transition_probability method.")
self.P = np.zeros((self.num_states,self.num_states,self.num_actions))
for action in range(self.num_actions):
for state in range(self.num_states):
# check if state is the fictional end state - self transition
if state == self.num_states-1:
self.P[state, state, action] = 1
continue
# check if the state is the goal state or an obstructed state - transition to end
row_col = seq_to_col_row(state, self.num_cols)
if self.obs_states is not None:
end_states = np.vstack((self.obs_states, self.goal_states))
else:
end_states = self.goal_states
if any(np.sum(np.abs(end_states-row_col), 1) == 0):
self.P[state, self.num_states-1, action] = 1
# else consider stochastic effects of action
else:
for dir in range(-1,2,1):
direction = self._get_direction(action, dir)
next_state = self._get_state(state, direction)
if dir == 0:
prob = self.p_good_trans
elif dir == -1:
prob = (1 - self.p_good_trans)*(self.bias)
elif dir == 1:
prob = (1 - self.p_good_trans)*(1-self.bias)
self.P[state, next_state, action] += prob
# make restart states transition back to the start state with
# probability 1
if self.restart_states is not None:
if any(np.sum(np.abs(self.restart_states-row_col),1)==0):
next_state = row_col_to_seq(self.start_state, self.num_cols)
self.P[state,:,:] = 0
self.P[state,next_state,:] = 1
return self
def _get_direction(self, action, direction):
"""
Takes is a direction and an action and returns a new direction.
Parameters
----------
action : int
The current action 0, 1, 2, 3 for gridworld.
direction : int
Either -1, 0, 1.
Returns
-------
direction : int
Value either 0, 1, 2, 3.
"""
left = [2,3,1,0]
right = [3,2,0,1]
if direction == 0:
new_direction = action
elif direction == -1:
new_direction = left[action]
elif direction == 1:
new_direction = right[action]
else:
raise Exception("getDir received an unspecified case")
return new_direction
def _get_state(self, state, direction):
"""
Get the next_state from the current state and a direction.
Parameters
----------
state : int
The current state.
direction : int
The current direction.
Returns
-------
next_state : int
The next state given the current state and direction.
"""
row_change = [-1,1,0,0]
col_change = [0,0,-1,1]
row_col = seq_to_col_row(state, self.num_cols)
row_col[0,0] += row_change[direction]
row_col[0,1] += col_change[direction]
# check for invalid states
if self.obs_states is not None:
if (np.any(row_col < 0) or
np.any(row_col[:,0] > self.num_rows-1) or
np.any(row_col[:,1] > self.num_cols-1) or
np.any(np.sum(abs(self.obs_states - row_col), 1)==0)):
next_state = state
else:
next_state = row_col_to_seq(row_col, self.num_cols)[0]
else:
if (np.any(row_col < 0) or
np.any(row_col[:,0] > self.num_rows-1) or
np.any(row_col[:,1] > self.num_cols-1)):
next_state = state
else:
next_state = row_col_to_seq(row_col, self.num_cols)[0]
return next_state
```

**Planning model for Dyna-q**

```
class PlanningModel(object):
def __init__(self, environment, time_weight=0):
self.time = 0
self.model = dict()
self.time_weight = time_weight
self.environment = environment
self.rand = np.random.RandomState(0)
def save_experience(self, state, new_direction, next_state, step_reward):
""" Deterministic environment model
"""
self.time += 1
self.create_state_action_value(state, new_direction)
self.model[tuple(state)][new_direction] = [list(next_state), step_reward, self.time]
def sample(self):
stateIndex = self.rand.choice(range(0, len(self.model.keys())))
state = list(self.model.keys())[stateIndex]
actionIndex = self.rand.choice(range(0, len(self.model[state].keys())))
new_direction = list(self.model[state].keys())[actionIndex]
newState, reward, time = self.model[state][new_direction]
# adjust reward with elapsed time since last vist
time_reward = self.time_weight * np.sqrt(self.time - time)
return list(state), new_direction, list(newState), reward, time_reward
def get_time_reward(self, state, new_direction):
self.create_state_action_value(state, new_direction=None)
newState, reward, time = self.model[tuple(state)][new_direction]
return self.time_weight * np.sqrt(max(0, self.time - time))
def create_state_action_value(self, state, new_direction):
if tuple(state) not in self.model:
self.model[tuple(state)] = dict()
# Actions that had never been tried before from a state
# were allowed to be considered in the planning step
for action_ in self.environment.actions:
if action_ != action:
# Such actions would lead back to the same state with a reward of zero
# Notice that the minimum time stamp is 1 instead of 0
self.model[tuple(state)][action_] = [list(state), 0, 1]
class DynaAgent(object):
def __init__(self, gamma=0.95, epsilon=0.1, alpha=0.1, planning_steps=50):
# discount factor
self.gamma = gamma
# probability for exploration
self.epsilon = epsilon
# step size
self.alpha = alpha
# planning steps
self.planning_steps = planning_steps
def epsilon_greedy_action(self, state, state_action_values, environment):
if np.random.binomial(1, self.epsilon) == 1:
return np.random.choice(environment.actions)
return np.argmax(state_action_values[state[0], state[1], :])
def epsilon_greedy_action_with_time_reward(self, state, state_action_values,
environment, planning_model):
if np.random.binomial(1, self.epsilon) == 1:
return np.random.choice(environment.actions)
Q = [planning_model.get_time_reward(state, a) for a in environment.actions]
Q += state_action_values[state[0], state[1], :]
return np.argmax(Q)
def play_episode(self, state_action_values, planning_model,
environment, action_with_time_reward=False):
steps = 0
state = environment.start_state
while state not in environment.goal_states:
steps += 1
if action_with_time_reward:
action = self.epsilon_greedy_action_with_time_reward(
state, state_action_values, environment,
planning_model
)
else:
action = self.epsilon_greedy_action(
state, state_action_values, environment
)
next_state = next_state
reward = reward
# Q-Learning update
state_action_values[state[0], state[1], action] += \
self.alpha * (reward + self.gamma * np.max(
state_action_values[new_state[0], new_state[1], :]) -
state_action_values[state[0], state[1], action])
# feed the model with experience
planning_model.save_experience(state, action, next_state, reward)
# sample experience from the model
for t in range(0, self.planning_steps):
state_sample, action_sample, new_state_sample, reward_sample, time_reward = planning_model.sample()
if not action_with_time_reward:
# only use the time_reward update if we don't select the actions
# using the time rewards
reward_sample += time_reward
state_action_values[state_sample[0], state_sample[1], action_sample] += \
self.alpha * (reward_sample + self.gamma * np.max(
state_action_values[new_state_sample[0], new_state_sample[1], :]) -
state_action_values[state_sample[0], state_sample[1], action_sample])
state = next_state
# check whether it has exceeded the step limit
if steps > environment.maxSteps:
break
return steps
```

**Changing World(as Maze)**

```
def changingMazeExperiment(environment, planning_model,
agent, num_runs=1, with_time_reward=False):
"""
Runs an experiment given a changing environmnet, with a planning agent
"""
# set up max steps
maxSteps = environment.maxSteps
# track the cumulative rewards
# rewards = np.zeros((maxSteps))
num_runs = int(num_runs)
rewards_ = np.zeros((num_runs, maxSteps))
for run in range(0, num_runs):
#print('Run ', run)
# initialize state action values
stateActionValues = environment.stateActionValues
# set old obstacles for the maze
# environment.obstacles = environment.oldObstacles
steps = 0
lastSteps = steps
while steps < maxSteps:
# play for an episode
steps += agent.play_episode(
stateActionValues, planning_model,
environment, action_with_time_reward=with_time_reward
)
# update cumulative rewards
steps_ = min(steps, maxSteps - 1)
rewards_[run, lastSteps: steps_] = rewards_[run, lastSteps]
rewards_[run, steps_] = rewards_[run, lastSteps] + 1
lastSteps = steps
# if steps > environment.changingPoint:
# change the obstacles
# environment.obstacles = environment.newObstacles
# averaging over runs
rewards = np.mean(rewards_, axis=0)
return rewards
```

**Problem Occur at this stage**

```
# step limit
model.maxSteps = 10 #model.num_states
model.changingPoint = 200
agent = DynaAgent(planning_steps=5, alpha=0.7)
dynaq_model = PlanningModel(environment=model, time_weight=0)
# This step hang in a loop
dynaq_rewards = changingMazeExperiment(
model, planning_model=dynaq_model, agent=agent, num_runs=2
)
```

Read more here: https://stackoverflow.com/questions/68480761/dyna-q-agent-with-grid-world-environment

### Content Attribution

This content was originally published by Fatih TEKE at Recent Questions - Stack Overflow, and is syndicated here via their RSS feed. You can read the original post over there.