0

StackOverflow で議論されている同様のトピックがたくさんあることは知っていますが、StackOverflow とインターネットの両方でかなり多くの調査を行いましたが、解決策を見つけることができませんでした。openAI ジムのカートポール ゲーム: OpenAI Gym Cartpoleを解決するために、古典的なディープ Q ラーニング アルゴリズムを実装しようとしています。

まず、ランダムな重みを生成するエージェントを作成しました。結果を以下のグラフに示します。 エージェントはランダム検索を使用してカートポールを打ち負かします

驚くべきことに、エージェントは、各エピソードで (-1.0 から 1.0) までの 4 つのランダムな均一な重み [w1、w2、w3、w4] を生成するだけで、多くのエピソードで 200 ステップ (最大) に到達することができました。

そこで、重み 4 つとバイアス 2 つだけの単純な DQN を実装し、エージェントにこのゲームを時間の経過とともに学習させることにしました。重みは最初にランダムに初期化され、エージェントがステップを実行するときにバックプロパゲーションを使用して重みが更新されます。

Epsilon Greedy 戦略を使用して、エージェントが最初に探索し、後で Q 値を利用できるようにしました。ただし、結果はランダム エージェントに比べて期待外れです。

ここに画像の説明を入力

多くのパラメーターとさまざまなアーキテクチャを調整しようとしましたが、結果はそれほど変わりません。だから、私の質問は次のとおりです。

質問: DQN の実装が間違っていたのでしょうか、それとも単純な DQN ではカートポールに勝てないのでしょうか? あなたの経験は何ですか?損失 (エラー) は減りますが、良い解決策を保証するものではありません。前もって感謝します。

import tensorflow as tf
import gym
import numpy as np
import random as rand
import matplotlib.pyplot as plt

# Cartpole's Observation:
#   4 Inputs
#   2 Actions (LEFT | RIGHT)
input_size = 4
output_size = 2

# Deep Q Network Class
class DQN:
    def __init__(self, var_names):
        self.var_names = var_names

        self._define_placeholders()
        self._add_layers()
        self._define_loss()
        self._choose_optimizer()
        self._initialize()

    # Placeholders:
    # Inputs: The place where we feed the Observations (States).
    # Targets: Q_target = R + gamma*Q(s', a*).
    def _define_placeholders(self):
        self.inputs = tf.placeholder(tf.float32, shape=(None, input_size), name='inputs')
        self.targets = tf.placeholder( tf.float32, shape=(None, output_size), name='targets')

    # Layers:
    # 4 Input Weights.
    # 2 Biases.
    # output = softmax(inputs*weights + biases).
    # Weights and biases are initialized randomly.
    def _add_layers(self):
        w = tf.get_variable(name=self.var_names[0], shape=(input_size, output_size),
                                initializer=tf.initializers.random_uniform(minval=-1.0, maxval=1.0) )
        b = tf.get_variable(name=self.var_names[1], shape=(output_size),
                                initializer=tf.initializers.random_uniform(minval=-1.0, maxval=1.0) )
        self.outputs = tf.nn.softmax(tf.matmul(self.inputs, w) + b)
        self.prediction = tf.argmax(self.outputs, 1)

    # Loss = MSE.
    def _define_loss(self):
        self.mean_loss = tf.losses.mean_squared_error(labels=self.targets, predictions=self.outputs) / 2

    # AdamOptimizer with starting learning rate: a = 0.005.
    def _choose_optimizer(self):
        self.optimizer = tf.train.AdamOptimizer(learning_rate=0.005).minimize(loss=self.mean_loss)

    # Initializes the dqn's weights.
    def _initialize(self):
        initializer = tf.global_variables_initializer()
        self.sess = tf.InteractiveSession()
        self.sess.run(initializer)

    # Get's current's DQN weights.
    def get_weights(self):
        return [ self.sess.run( tf.trainable_variables(var) )[0] for var in self.var_names ]
        
    # Updates the weights of DQN.
    def update_weights(self, new_weights):
        variables = [tf.trainable_variables(name)[0] for name in self.var_names]
        update = [ tf.assign(var, weight) for (var, weight) in zip(variables, new_weights) ]
        self.sess.run(update)

    # Predicts the best possible action from a state s.
    # a* = argmax( Q(s) )
    # Returns from Q(s), a*
    def predict(self, states):
        Q, actions = self.sess.run( [self.outputs, self.prediction],
                                    feed_dict={self.inputs: states} )
        return Q, actions

    # It partially fits the given observations and the targets into the network.
    def partial_fit(self, states, targets):
        _, loss = self.sess.run( [self.optimizer, self.mean_loss],
                                    feed_dict={self.inputs: states, self.targets: targets} )
        return loss

# Replay Memory Buffer
# It stores experiences as (s,a,r,s') --> (State, Action, Reward, Next_Action).
# It generates random mini-batches of experiences from the memory.
# If the memory is full, then it deletes the oldest experiences. Experience is an step.
class ReplayMemory:
    def __init__(self, mem_size):
        self.mem_size = mem_size
        self.experiences = []

    def add_experience(self, xp):
        self.experiences.append(xp)
        if len(self.experiences) > self.mem_size:
            self.experiences.pop(0)

    def random_batch(self, batch_size):
        if len(self.experiences) < batch_size:
            return self.experiences
        else:
            return rand.sample(self.experiences, batch_size)

# The agent's class.
# It contains 2 DQNs: Online DQN for Predictions and Target DQN for the targets.
class Agent:
    def __init__(self, epsilon, epsilon_decay, min_epsilon, gamma, mem_size):
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.min_epsilon = min_epsilon
        self.gamma = gamma
        self.replay_mem = ReplayMemory(mem_size)
        self.online_dqn = DQN( var_names=['online_w', 'online_b'] )
        self.target_dqn = DQN( var_names=['target_w', 'target_b'] )
        self.state = None

    def set_epsilon(self, epsilon):
        self.epsilon = epsilon

    def reduce_epsilon(self):
        if self.epsilon > self.min_epsilon:
            self.epsilon -= self.epsilon_decay
    
    def update_state(self, state):
        self.state = state

    def update_memory(self, state, action, reward, next_state):
       experience = (state, action, reward, next_state)
        self.replay_mem.add_experience(experience)

    # It updates the target network after N steps.
    def update_network(self):
        self.target_dqn.update_weights( self.online_dqn.get_weights() )

    # Randomly chooses an action from the enviroment.
    def explore(self, env):
        action = env.action_space.sample()
        return action

    # Predicts and chooses the best possible moves from the current state.
    def exploit(self):
        _, action = self.online_dqn.predict(self.state)
        return action[0]

    # Uses Epsilon-Greedy to decide whether to explore or exploit.
    # Epsilon starts with 1 and is reduced over the time.
    # After the agent makes a move, he returns: state, action, reward, next_state.
    def take_action(self, env):
        action = None
        p = rand.uniform(0.0, 1.0)
        if p < self.epsilon:
            action = self.explore(env)
        else:
            action = self.exploit()
        next_state, reward, done, _ = env.step(action)
        if done:
            next_state = None
        else:
            next_state = np.reshape( next_state, (1, input_size) )
        return self.state, action, reward, next_state, done

    # Trains the agent.
    # A random mini-batch is generated from the memory.
    # We feed each experience into the DQN.
    # For each 
    # Q(s) = Qtarget(s)
    # Q(s'), a* = Qtarget(s'), argmax Q(s')
    # We set targets = Q(s')

    # For each action (a), reward (r), next_state (s') in the batch:
    # If s' is None the GameOver. So, we set target[i] = Reward
    # If s' != None, then target[i][a] = r + gamma*Q(s', 'a')

    # Then, the online DQN calculates the mean squared difference of r + gamma*Q(s', 'a') - Q(s, a)
    # and uses Back-Propagation to update the weights.
    def train(self):
        mini_batch = self.replay_mem.random_batch(batch_size=256)
        batch_size = len(mini_batch)
        states = np.zeros( shape=(batch_size, input_size) )
        next_states = np.zeros( shape=(batch_size, input_size) )
        for i in range(batch_size):
            states[i] = mini_batch[i][0]
            next_states[i] = mini_batch[i][3]

        Q, _ = self.target_dqn.predict(states)
        next_Q, next_actions = self.target_dqn.predict(next_states)
        targets = Q
        for i in range(batch_size):
            action = mini_batch[i][1]
            reward = mini_batch[i][2]
            next_state = mini_batch[i][3]
            if next_state is None:
                targets[i][action] = reward
            else:
                targets[i][action] = reward + self.gamma * next_Q[i][ next_actions[i] ]
        loss = self.online_dqn.partial_fit(states, targets)
        return loss
    
def play(agent, env, episodes, N, render=False, train=True):
    ep = 0
    episode_steps = []
    steps = 0
    total_steps = 0
    loss = 0

    # Sets the current state as the initial.
    # Cartpole spawns the agent in a random state.
    agent.update_state( np.reshape( env.reset(), (1, input_size) ) )
    agent.update_network()

    while ep < episodes:
        if render:
            env.render()
    
        # The target DQN's weights are frozen.
        # The agent Updates the Target DQN's Weights after 100 steps.
        if train and total_steps % N == 0:
            agent.update_network()
            print('---Target network updated---')

        # Takes action.
        state, action, reward, next_state, done = agent.take_action(env)

        # Updates the memory and the current state.
        agent.update_memory(state, action, reward, next_state)
        agent.update_state(next_state)
        steps += 1
        total_steps += 1

        if train:
            loss = agent.train()

        if done:
            agent.update_state( np.reshape( env.reset(), (1, input_size) ) )
            episode_steps.append(steps)
            ep += 1
            if train:
                agent.reduce_epsilon()
                print('End of episode', ep, 'Training loss =', loss, 'Steps =', steps)
            steps = 0

    if render:
        env.close()

    return episode_steps

env = gym.make('CartPole-v0')

# Training the agent.
agent = Agent(epsilon=1, epsilon_decay = 0.01, min_epsilon = 0.05, gamma=0.9, mem_size=50000)
episodes = 1000
N = 100
episode_steps = play(agent, env, episodes, N)

# Plotting the results.
# After the training is done, the steps should be maximized (up to 200)
plt.plot(episode_steps)
plt.show()

# Testing the agent.
agent.set_epsilon(0)
episodes = 1
steps = play(agent, env, episodes, N, render=True, train=False)[0]
print('\nSteps =', steps)
4

1 に答える 1