倒立摆Cartpole-v1

简介

倒立摆
倒立摆为一个小车和一个杆通过轴连接,杆在初始时稍有偏离垂直线,在重力作用下会倒下,游戏目的是通过左右控制小车(施加左右的力)来避免杆的倒下。

API

获取初始状态

env = gym.make("CartPole-v1")
state = env.reset()

state为一个包含4个元素的list,分别表示小车位置,小车速度,杆的角度,杆的角速度,具体范围如下

Type: Box(4)
Num     Observation               Min                     Max
0       Cart Position             -4.8                    4.8
1       Cart Velocity             -Inf                    Inf
2       Pole Angle                -0.418 rad (-24 deg)    0.418 rad (24 deg)
3       Pole Angular Velocity     -Inf                    Inf

执行操作:

next_state, reward, done, info = env.step(action)

action为0或1,分别表示对小车向左施加力或向右施加力
返回值为4个参数
next_state表示操作完成后的状态list(4)
reward为操作奖励,永远为1
done表示是否结束游戏,标准为杆的角度超过12度,或车的位置超过2.4(屏幕看不见了),游戏回合数超过500(成功)
info为空字典

绘制画面到屏幕:

env.render()

Deep Q-Learning算法

Q ( s , a ) Q(s,a) Q(s,a)是程序的记忆,通过记忆,计算状态s下,执行操作a的权值
每次在操作集合 A A A中选择使 Q Q Q最大的 a a a操作执行,即 arg max ⁡ a ∈ A Q ( s , a ) \argmax_{a \in A} Q(s,a) aAargmaxQ(s,a)

Q Q Q的转移公式:
Q ( s , a ) = R ( s , a ) + γ   max ⁡ a ~ Q ( s ~ , a ~ ) Q(s,a)=R(s,a)+\gamma \ \max_{\tilde a}Q(\tilde s ,\tilde a) Q(s,a)=R(s,a)+γ a~maxQ(s~,a~)
R ( s , a ) R(s,a) R(s,a)为在状态s下,执行操作a获得的奖励(即眼前的价值)
s ~ \tilde s s~为状态s在执行操作a后的新状态
Q ( s ~ , a ~ ) Q(\tilde s ,\tilde a) Q(s~,a~)为新状态的Q值
γ \gamma γ为折扣因子,可以看出 γ \gamma γ越大,程序越看重记忆中的经验价值,反之,越在乎眼前的利益

对普通的Q-Learning算法,Q函数可以用一张表格表示,如使用二维数组存储对每个操作和状态下的Q值

在Deep Q-Learning中,Q函数为神经网络

算法流程

初始时,程序没有经验,Q函数初值为随机

  1. 随机选择一个状态s
  2. 按照一定探索概率 ϵ \epsilon ϵ 随机选择可行操作,或者根据Q经验选择操作
  3. 用选择的操作 a a a,得到下一个状态 s ~ \tilde s s~
  4. 更新 Q ( s , a ) Q(s,a) Q(s,a):对于普通Q-Learning更新Q表格即可;对于Deep Q-Learning 则从将本次操作的 ( s , a , s ~ ) (s,a,\tilde s) (s,a,s~)存进经验库中,当经验库的量足够大时,从中选择若干条经验作为一个batch,训练一次神经网络

对于探索率 ϵ \epsilon ϵ,可以初始设为1,即在没有经验时尽量探索试错,积累经验,随着迭代次数,降低 ϵ \epsilon ϵ,最终就能通过经验做出准确决策。

代码

import tensorflow as tf
import numpy as np
import gym
import random


class DNN(tf.keras.Model):
    def __init__(self):
        super().__init__()
        self.dense1 = tf.keras.layers.Dense(units=250, activation=tf.nn.relu)
        self.dense2 = tf.keras.layers.Dense(units=250, activation=tf.nn.relu)
        self.dense3 = tf.keras.layers.Dense(units=2)

    def call(self, input):
        x = self.dense1(input)
        x = self.dense2(x)
        x = self.dense3(x)
        return x

    def predict(self, input):
        y = self(input)
        # print(y)
        # print(tf.math.argmax(y,1).numpy())
        return tf.math.argmax(y, 1).numpy()


batch_size = 32
train_episodes = 500
explore_episodes = 100
initial_epsilon = 1.0
gamma = 1.0
final_epsilon = 0.01
learning_rate = 1e-3

env = gym.make("CartPole-v1")
model = DNN()
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
expirence = []

for episode_id in range(train_episodes):
    state = env.reset()
    epsilon = max(initial_epsilon * (train_episodes-episode_id) /
                  train_episodes, final_epsilon)

    step = 0
    while True:
    #for step in range(explore_episodes):
        step = step + 1
        env.render()

        if random.random() < epsilon:
            action = env.action_space.sample()
        else:
            action = model.predict(np.expand_dims(state, 0))[0]

        next_state, reward, done, info = env.step(action)
        reward = -10 if done else reward
        expirence.append((state, action, next_state, reward, 0 if done else 1))
        state = next_state

        #if done or step == explore_episodes-1:
        if done:
            print("episode %d, epsilon %f, step %d" % (episode_id, epsilon, step))
            break

        if len(expirence) >= batch_size:
            batch_state, batch_action, batch_next_state, batch_reward, batch_flag = zip(
                *random.sample(expirence, batch_size))
            batch_state = np.array([s for s in batch_state])
            batch_action = np.array([s for s in batch_action])
            batch_next_state = np.array([s for s in batch_next_state])
            batch_reward = np.array([s for s in batch_reward])
            batch_flag = np.array([s for s in batch_flag])
            #print(batch_state)
            Y = np.array(model(batch_next_state))
            Y = batch_reward + \
                (gamma * tf.reduce_max(Y, axis=1)* batch_flag)
            with tf.GradientTape() as tape:
                y_pred = tf.reduce_sum(
                    model(batch_state) * tf.one_hot(batch_action, depth=2), axis=1)
                loss = tf.keras.losses.mean_squared_error(
                    y_true=Y, y_pred=y_pred)
            grads = tape.gradient(loss, model.variables)
            optimizer.apply_gradients(
                grads_and_vars=zip(grads, model.variables))

学习效果

最终基本能保持平衡
在这里插入图片描述

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐