Dyna-Q agent with grid world environment

I am trying to learn the environment I created for a reinforcement learning project with two different classical RL algorithms, Q-learning and Dyna-q. I managed to do it with Q-learning, but when I do it with Dyna-q, it hangs in a loop and I couldn't find why. I shared the necessary environment and agent codes. How can I get out of this loop or make it more effective?

def row_col_to_seq(row_col, dim_y):
    return row_col[:,0] * dim_y + row_col[:,1]

def seq_to_col_row(seq, dim_y):
    r = floor(seq / dim_y)
    c = seq - r * dim_y
    return np.array([[r, c]])

env codes:

class GridWorld:
"""
Creates a gridworld object to pass to an RL algorithm.

Parameters
----------
num_rows : int
    The number of rows in the gridworld.

num_cols : int
    The number of cols in the gridworld.

start_state : numpy array of shape (1, 2), np.array([[row, col]])
    The start state of the gridworld (can only be one start state)

goal_states : numpy arrany of shape (n, 2)
    The goal states for the gridworld where n is the number of goal
    states.
"""
def __init__(self, num_rows, num_cols, start_state, goal_states):
    self.num_rows = num_rows
    self.num_cols = num_cols
    self.start_state = start_state
    self.goal_states = goal_states
    self.obs_states = None
    self.bad_states = None
    self.num_bad_states = 0
    self.p_good_trans = None
    self.bias = None
    self.r_step = None
    self.r_goal = None
    self.r_dead = None
    self.gamma = 1 # default is no discounting
    self.maxSteps = float('inf')
    self.changingPoint = None
    self.num_actions = 4
    self.actions = [0, 1, 2, 3]
    self.stateActionValues = np.zeros((self.num_rows, self.num_cols, self.num_actions))
    self.resolution = 1


def add_obstructions(self, obstructed_states=None, bad_states=None, restart_states=None):
    """
    Add obstructions to the grid world.

    Obstructed states: walls that prohibit the agent from entering that state.

    Bad states: states that incur a greater penalty than a normal step.

    Restart states: states that incur a high penalty and transition the agent
                    back to the start state (but do not end the episode).

    Parameters
    ----------
    obstructed_states : numpy array of shape (n, 2)
        States the agent cannot enter where n is the number of obstructed states
        and the two columns are the row and col position of the obstructed state.

    bad_states: numpy array of shape (n, 2)
        States in which the agent incurs high penalty where n is the number of bad
        states and the two columns are the row and col position of the bad state.

    restart_states: numpy array of shape (n, 2)
        States in which the agent incurs high penalty and transitions to the start
        state where n is the number of restart states and the two columns are the
        row and col position of the restart state.
    """
    self.obs_states = obstructed_states
    self.bad_states = bad_states
    if bad_states is not None:
        self.num_bad_states = bad_states.shape[0]
    else:
        self.num_bad_states = 0
    self.restart_states = restart_states
    if restart_states is not None:
        self.num_restart_states = restart_states.shape[0]
    else:
        self.num_restart_states = 0

def add_transition_probability(self, p_good_transition, bias):
    """
    Add transition probabilities to the grid world.

    p_good_transition is the probability that the agent successfully
    executes the intended action. The action is then incorrectly executed
    with probability 1 - p_good_transition and in this case the agent
    transitions to the left of the intended transition with probability
    (1 - p_good_transition) * bias and to the right with probability
    (1 - p_good_transition) * (1 - bias).

    Parameters
    ----------
    p_good_transition : float (in the interval [0,1])
         The probability that the agents attempted transition is successful.

    bias : float (in the interval [0,1])
        The probability that the agent transitions left or right of the
        intended transition if the intended transition is not successful.
    """
    self.p_good_trans = p_good_transition
    self.bias = bias

def add_rewards(self, step_reward, goal_reward, bad_state_reward=None, restart_state_reward = None):
    """
    Define which states incur which rewards.

    Parameters
    ----------
    step_reward : float
        The reward for each step taken by the agent in the grid world.
        Typically a negative value (e.g. -1).

    goal_reward : float
        The reward given to the agent for reaching the goal state.
        Typically a middle range positive value (e.g. 10)

    bad_state_reward : float
        The reward given to the agent for transitioning to a bad state.
        Typically a middle range negative value (e.g. -6)

    restart_state_reward : float
        The reward given to the agent for transitioning to a restart state.
        Typically a large negative value (e.g. -100)
    """
    self.r_step = step_reward
    self.r_goal = goal_reward
    self.r_bad = bad_state_reward
    self.r_restart = restart_state_reward

def add_discount(self, discount):
    """
    Discount rewards so that recent rewards carry more weight than past rewards.

    Parameters
    ----------
    discount : float (in the interval [0, 1])
        The discount factor.
    """
    self.gamma = discount

def create_gridworld(self):
    """
    Create the grid world with the specified parameters.
    Returns
    -------
    self : class object
        Holds information about the environment to solve
        such as the reward structure and the transition dynamics.
    """
    self.num_actions = 4
    self.num_states = self.num_cols * self.num_rows + 1
    self.start_state_seq = row_col_to_seq(self.start_state, self.num_cols)
    self.goal_states_seq = row_col_to_seq(self.goal_states, self.num_cols)

    # rewards structure
    self.R = self.r_step * np.ones((self.num_states, 1))
    self.R[self.num_states-1] = 0
    self.R[self.goal_states_seq] = self.r_goal
    for i in range(self.num_bad_states):
        if self.r_bad is None:
            raise Exception("Bad state specified but no reward is given")
        bad_state = row_col_to_seq(self.bad_states[i,:].reshape(1,-1), self.num_cols)
        self.R[bad_state, :] = self.r_bad
    for i in range(self.num_restart_states):
        if self.r_restart is None:
            raise Exception("Restart state specified but no reward is given")
        restart_state = row_col_to_seq(self.restart_states[i,:].reshape(1,-1), self.num_cols)
        self.R[restart_state, :] = self.r_restart

    # probability model
    if self.p_good_trans == None:
        raise Exception("Must assign probability and bias terms via the add_transition_probability method.")

    self.P = np.zeros((self.num_states,self.num_states,self.num_actions))
    for action in range(self.num_actions):
        for state in range(self.num_states):

            # check if state is the fictional end state - self transition
            if state == self.num_states-1:
                self.P[state, state, action] = 1
                continue

            # check if the state is the goal state or an obstructed state - transition to end
            row_col = seq_to_col_row(state, self.num_cols)
            if self.obs_states is not None:
                end_states = np.vstack((self.obs_states, self.goal_states))
            else:
                end_states = self.goal_states

            if any(np.sum(np.abs(end_states-row_col), 1) == 0):
                self.P[state, self.num_states-1, action] = 1

            # else consider stochastic effects of action
            else:
                for dir in range(-1,2,1):
                    direction = self._get_direction(action, dir)
                    next_state = self._get_state(state, direction)
                    if dir == 0:
                        prob = self.p_good_trans
                    elif dir == -1:
                        prob = (1 - self.p_good_trans)*(self.bias)
                    elif dir == 1:
                        prob = (1 - self.p_good_trans)*(1-self.bias)

                    self.P[state, next_state, action] += prob

            # make restart states transition back to the start state with
            # probability 1
            if self.restart_states is not None:
                if any(np.sum(np.abs(self.restart_states-row_col),1)==0):
                    next_state = row_col_to_seq(self.start_state, self.num_cols)
                    self.P[state,:,:] = 0
                    self.P[state,next_state,:] = 1
    return self

def _get_direction(self, action, direction):
    """
    Takes is a direction and an action and returns a new direction.
    Parameters
    ----------
    action : int
        The current action 0, 1, 2, 3 for gridworld.
    direction : int
        Either -1, 0, 1.
    Returns
    -------
    direction : int
        Value either 0, 1, 2, 3.
    """
    left = [2,3,1,0]
    right = [3,2,0,1]
    if direction == 0:
        new_direction = action
    elif direction == -1:
        new_direction = left[action]
    elif direction == 1:
        new_direction = right[action]
    else:
        raise Exception("getDir received an unspecified case")
    return new_direction

def _get_state(self, state, direction):
    """
    Get the next_state from the current state and a direction.
    Parameters
    ----------
    state : int
        The current state.
    direction : int
        The current direction.
    Returns
    -------
    next_state : int
        The next state given the current state and direction.
    """
    row_change = [-1,1,0,0]
    col_change = [0,0,-1,1]
    row_col = seq_to_col_row(state, self.num_cols)
    row_col[0,0] += row_change[direction]
    row_col[0,1] += col_change[direction]

    # check for invalid states
    if self.obs_states is not None:
        if (np.any(row_col < 0) or
            np.any(row_col[:,0] > self.num_rows-1) or
            np.any(row_col[:,1] > self.num_cols-1) or
            np.any(np.sum(abs(self.obs_states - row_col), 1)==0)):
            next_state = state
        else:
            next_state = row_col_to_seq(row_col, self.num_cols)[0]
    else:
        if (np.any(row_col < 0) or
            np.any(row_col[:,0] > self.num_rows-1) or
            np.any(row_col[:,1] > self.num_cols-1)):
            next_state = state
        else:
            next_state = row_col_to_seq(row_col, self.num_cols)[0]
    return next_state

Planning model for Dyna-q

class PlanningModel(object):
def __init__(self, environment, time_weight=0):
    self.time = 0
    self.model = dict()
    self.time_weight = time_weight
    self.environment = environment
    self.rand = np.random.RandomState(0)

def save_experience(self, state, new_direction, next_state, step_reward):
    """ Deterministic environment model
    """
    self.time += 1
    self.create_state_action_value(state, new_direction)
    self.model[tuple(state)][new_direction] = [list(next_state), step_reward, self.time]

def sample(self):
    stateIndex = self.rand.choice(range(0, len(self.model.keys())))
    state = list(self.model.keys())[stateIndex]
    actionIndex = self.rand.choice(range(0, len(self.model[state].keys())))
    new_direction = list(self.model[state].keys())[actionIndex]
    newState, reward, time = self.model[state][new_direction]

    # adjust reward with elapsed time since last vist
    time_reward = self.time_weight * np.sqrt(self.time - time)

    return list(state), new_direction, list(newState), reward, time_reward

def get_time_reward(self, state, new_direction):
    self.create_state_action_value(state, new_direction=None)
    newState, reward, time = self.model[tuple(state)][new_direction]
    return self.time_weight * np.sqrt(max(0, self.time - time))

def create_state_action_value(self, state, new_direction):
    if tuple(state) not in self.model:
        self.model[tuple(state)] = dict()

        # Actions that had never been tried before from a state 
        # were allowed to be considered in the planning step
        for action_ in self.environment.actions:
            if action_ != action:
                # Such actions would lead back to the same state with a reward of zero
                # Notice that the minimum time stamp is 1 instead of 0
                self.model[tuple(state)][action_] = [list(state), 0, 1]


class DynaAgent(object):

def __init__(self, gamma=0.95, epsilon=0.1, alpha=0.1, planning_steps=50):
    # discount factor
    self.gamma = gamma
    # probability for exploration
    self.epsilon = epsilon
    # step size
    self.alpha = alpha
    # planning steps
    self.planning_steps = planning_steps

def epsilon_greedy_action(self, state, state_action_values, environment):
    if np.random.binomial(1, self.epsilon) == 1:
        return np.random.choice(environment.actions)
    return np.argmax(state_action_values[state[0], state[1], :])

def epsilon_greedy_action_with_time_reward(self, state, state_action_values,
                                           environment, planning_model):
    if np.random.binomial(1, self.epsilon) == 1:
        return np.random.choice(environment.actions)
    Q = [planning_model.get_time_reward(state, a) for a in environment.actions]
    Q += state_action_values[state[0], state[1], :]
    return np.argmax(Q)

def play_episode(self, state_action_values, planning_model,
                 environment, action_with_time_reward=False):
    steps = 0
    state = environment.start_state
    while state not in environment.goal_states:
        steps += 1

        if action_with_time_reward:
            action = self.epsilon_greedy_action_with_time_reward(
                state, state_action_values, environment,
                planning_model
            )
        else:
            action = self.epsilon_greedy_action(
                state, state_action_values, environment
            )

        next_state = next_state 
        reward = reward

        # Q-Learning update
        state_action_values[state[0], state[1], action] += \
            self.alpha * (reward + self.gamma * np.max(
                    state_action_values[new_state[0], new_state[1], :]) -
            state_action_values[state[0], state[1], action])

        # feed the model with experience
        planning_model.save_experience(state, action, next_state, reward)

        # sample experience from the model
        for t in range(0, self.planning_steps):
            state_sample, action_sample, new_state_sample, reward_sample, time_reward = planning_model.sample()
            if not action_with_time_reward:
                # only use the time_reward update if we don't select the actions 
                # using the time rewards
                reward_sample += time_reward
            state_action_values[state_sample[0], state_sample[1], action_sample] += \
                self.alpha * (reward_sample + self.gamma * np.max(
                        state_action_values[new_state_sample[0], new_state_sample[1], :]) -
                state_action_values[state_sample[0], state_sample[1], action_sample]) 

        state = next_state

        # check whether it has exceeded the step limit
        if steps > environment.maxSteps:
            break

    return steps

Changing World(as Maze)

def changingMazeExperiment(environment, planning_model,
                       agent, num_runs=1, with_time_reward=False):
"""
Runs an experiment given a changing environmnet, with a planning agent
"""

# set up max steps
maxSteps = environment.maxSteps

# track the cumulative rewards
# rewards = np.zeros((maxSteps))
num_runs = int(num_runs)
rewards_ = np.zeros((num_runs, maxSteps))

for run in range(0, num_runs):
     #print('Run ', run) 

    # initialize state action values
    stateActionValues = environment.stateActionValues

    # set old obstacles for the maze
    # environment.obstacles = environment.oldObstacles

    steps = 0
    lastSteps = steps
    while steps < maxSteps:
        # play for an episode
        steps += agent.play_episode(
            stateActionValues, planning_model,
            environment, action_with_time_reward=with_time_reward
        )

        # update cumulative rewards
        steps_ = min(steps, maxSteps - 1)
        rewards_[run, lastSteps: steps_] = rewards_[run, lastSteps]
        rewards_[run, steps_] = rewards_[run, lastSteps] + 1
        lastSteps = steps

      #  if steps > environment.changingPoint:
            # change the obstacles
          #  environment.obstacles = environment.newObstacles

# averaging over runs
rewards = np.mean(rewards_, axis=0)

return rewards

Problem Occur at this stage

# step limit
model.maxSteps = 10 #model.num_states
model.changingPoint = 200

agent = DynaAgent(planning_steps=5, alpha=0.7)
dynaq_model = PlanningModel(environment=model, time_weight=0)



# This step hang in a loop
dynaq_rewards = changingMazeExperiment(
model, planning_model=dynaq_model, agent=agent, num_runs=2
)


Read more here: https://stackoverflow.com/questions/68480761/dyna-q-agent-with-grid-world-environment

Content Attribution

This content was originally published by Fatih TEKE at Recent Questions - Stack Overflow, and is syndicated here via their RSS feed. You can read the original post over there.

%d bloggers like this: