1. np.stack((x_t, x_t, x_t, x_t), axis=2)  将图片进行串接的操作,使得图片的维度为[80, 80, 4]

参数说明: (x_t, x_t, x_t, x_t) 表示需要进行串接的图片, axis = 2 表示在第三个维度上进行串接操作

2. cv2.resize(x, [80, 80])  # 将图片的维度变化为80 * 80的维度

参数说明, x为输入的图片,80, 80表示图片变化的维度

3.cv2.cvtColor(x_t, tf.COLOR_RGB2GREY) 将图片转换为灰度图

参数说明, x_t表示输入的图片, tf.COLOR_RGB2GREY表示颜色转换的模式

4.cv2.threshold(x_t, 1, 255, cv2.THRESH_BINARY)  # 将大于等于1的索引变化为255,等于0的不变,也就是对图片进行二值化操作

参数说明: x_t表示输入图片,1表示阈值,255表示大于阈值后的数值, cv2.THRESH_BINARY表示进行二值化的模式

5. random.sample(D, batch_size)  从D样本中随机抽取batch_size个数据

参数说明:D表示样本,batch_size表示抽取样本的个数

因为对于每一帧图像而言,一只鸟在图像中的移动方向可能是向上的也可能是向下的

代码说明:使用的网络是卷积网络,

输入的是4帧的图片,即80*80*4,

预测结果为当前位置的上下两个方向的V值,即奖励值

损失值: 为当前真实的奖励值: cost = V[state] - (r_1 + GAMMA * V[next_state]), 目的是为了当前的预测奖励值 与 实际公式计算的奖励值越接近越好

V[state] 表示当前位置的预测奖励值

r_1 表示下一个状态的及时奖励

V[next_state] 表示下一个位置的预测奖励值

 

代码说明: 主要代码包括来部分,第一部分:主要包括构造网络结构,生成[None, 2]的输出结果,第二部分,获得一个batch的值来进行模型的训练

第一部分:构造网络模型, 用于进行v的预测

第一步:第一层卷积的W_conv1, 大小为[8, 8, 4, 32], 第一层卷积的b_conv1, 大小为[32] 

第二步:第二层卷积的W_conv2, 大小为[4, 4, 32, 64], 第二层卷积的b_conv2, 大小为[64]

第三步:第三层卷积的W_conv3, 大小为[3, 3, 64, 64], 第三层卷积的b_conv3, 大小为[64]

第四步:使用tf.placeholder('float32', [None, 80, 80, 4])  # 初始化输入的数据s

第五步:使用tf.nn.relu() 构造第一层卷积层 步长为4,使用max_pool_2x2 进行池化操作

第六步:使用tf.nn.relu() 构造第二层卷积层 步长为2

第七步:使用tf.nn.relu(conv2d(x, W_conv3, 1) + b) 步长为1 来构造第三层卷积层

第八步:使用tf.reshape(h_conv4, [-1, 1600])  # 将卷积后的结果进行拉平操作

第九步:使用tf.nn.relu(tf.matmul(x,  w_fc1) + b)  # 进行第一次的全连接操作

第十步:使用tf.matmul(x, w_out) + b # 进行输出层的全连接操作,输出的结果为[None, action], 返货的结果为s,readout, h_fc1, s为输入结果,readout为输出结果,h_fc1为第一层全连接的输出结果

 

第二部分:将上述求得的s,readout, h_fc1, sess, 传入,这个先观察1000次,1000以后再进行模型参数的训练,同时这里使用一个epsilon, 当random.random() 小于epsilon,那么下一个动作的位置方向是随机值,否者的话就使用np.argmax(readout), 即readout预测出当前位置v值大的索引值作为方向

第一步:定义网络的损失值和训练步骤

              第一步:使用tf.placeholder('float32', [None, 2]) 用来构造a即方向

              第二步: 使用tf.placeholder('float32', [None])  用来构造实际当前位置的V值

              第三步:预测结果: action_readout = tf.reduce_sum(tf.multiply(readout, a))  来获得当前预测的结果v奖励值

              第四步: 构造cost函数,使用tf.reduce_mean(tf.square(action_readout - y))

              第五步:构造train_op, 使用tf.trian.Adaoptimer(1e-6).minimize(cost)

              第六步:使用sess.run(tf.global_variable_initial())  进行参数的初始化操作

第二步:构造第一个输出的参数, 大小为[80, 80, 4]

              第一步:使用game_state = game.GameState() 来实例化game_state的向量

              第二步:使用D = deque() 来构造存储的向量,用于存储st 当前位置, a_t下一个动作的方向,  r_t,下一个动作的及时奖励,以及st1,下一个位置

              第三步: 构造一个初始化方向, do_nothing

                             第一步: np.zeros([2]) 来构造do_nothing的初始值

                             第二步:do_nothing[0] = 1 将do_nothing的第一个位置的索引值构造为1 

              第四步:将do_nothing 输入到game_state.frame_step(do_nothing) 获得do_nothing 下一个位置的x_t当前图片, r_0为及时奖励,  terminal是否停止

              第五步:使用cv2.cvtColor(cv2.resize(x_t, (80, 80), cv2.COLOR_RGB2GRAY)) 将图片进行维度变换,同时将其转换为灰度图

              第六步: 使用cv2.threshold(x_t, 1, 255, cv2.THRESH_BINARY) # 进行二值化操作

              第七步:使用np.stack((x_t, x_t, x_t, x_t), axis=2) # 将此时的四张图片做一个串接,作为第一个输入s_t 

第三步:进入循环,用于进行参数的训练, t = 0 

               第一步:使用readout.eval(feed_dict={s:s_t}) 来获得当前位置的两个方向的v值

               第二步:根据随机数和np.argmax() 来获得方向值

                             第一步:使用np.zeros(2) 来获得a_t的初始值

                            第二步: 使用random.random() 来获得随机值,如果随机值小于epsilon,使用random.ranrange(ACTION) 来获得一个0,1的随机值

                            第三步:a_t[index] = 1, 来构造a_t 

                            第四步: 如果随机值大于epsilon, 使用np.argmax(t_readout) 来获得方向较大的预测值v,来作为索引值

                             第五步: a_t[index] = 1 来构造a_t  

                第三步: 使用game_state.frame_step(a_t)  来获得x_t1的图片,r_1的及时奖励, terminal判断是否停止了, 将其与s_t的前三帧图片进行拼接

                             第一步:使用game_state.frame_step(a_t) 来获得x_t1, r_1, terminal 

                             第二步:使用cv2.cvtColor(cv2.resize(x_t1, (80, 80)), cv2.COLOR_RGB2GRAY) 将彩图转换为灰度图

                             第三步:使用cv2.threshold(x_t, 1, 255, cv2.THRESH_BINARY) 进行二值化操作

                             第四步:使用np.reshape(x_t, [80, 80, 1]) 将图片转换为[80, 80, 1] 的数据

                             第五步:使用np.append(x_t, s_t[:, :, 3], axis=2) 将图片进行拼接

                第四步: 将s_t, a_t, r_1. s_t1 存储在D里面, s_t当前位置,a_t下一个方向,r_1下一个方向的奖励值,s_t1下一个位置

                             第一步:D.append((s_t, a_t, r_1, s_t1))

                             第二步:判断len(D) 大于MERORY, 使用D.popleft() 去除最后一张图片,使其保持5000的个数

                第五步:如果t > OBSERVE, 使用random.sample(D, BATCH_SIZE) 从D中所及获取batch_size章图片进行模型的训练,sess.run(train_op)

                              第一步:ministbatch = random.sample(D, BATCH_SIZE) 从D中获得batch_size的信息

                              第二步:s_j_batch = [r[0] for r in mnistbatch] 获得当前位置, a_batch 下一个动作的方向, r_batch下一个动作的及时奖励,s_j1_batch 获得下一个位置的图片

                              第三步:使用readou_batch = readout.eval(feed_dict={s:s_j1_batch}) 来获得下一个位置的v奖励值,

                              第四步:构造y_batch,用于存储当前位置的v奖励值

                                           第一步:循环len(ministbatch)

                                           第二步:terminal = ministbatch[i][4] 来获得此时的terminal

                                           第三步:if terminal 表示停止了,那么y_batch.append(r_batch[i])  # 当前的预测奖励值使用r_batch[i]

                                           第四步:如果没有停止,那么y_batch.append(r_batch[i] + GAMMA*readout_batch[i]) # 当前的预测奖励值使用r_batch[j]  + readout_batch[i] 加上的是下一个位置的奖励值

                  第六步: 使用sess.run(train_op, feed_dict={s: s_j_batch, a:a_bacth, y:y_batch}) ,s_j_batch表示当前位置,a_batch表示一个动作的方向,y_batch,表示当前位置的实际奖励值

                  第七步:s_t = s_t1, 将下一个动作的图片赋值给当前图片,t = t + 1 更新迭代次数

                  第八步:如果迭代次数为10000, 使用saver.save(global_step=t) 将模型进行保存

                  第九步:使用epislon -= (initial - final) /  进行episilon的更新操作

                  第十步:打印各个参数信息

 train.py 

import tensorflow as tf
import numpy as np
import sys
sys.path.append('game/')
import wrapped_flappy_bird as game
import random
from collections import deque
import cv2

ACTION = 2 # 方向的种类
BATCH_SIZE = 64 # 每一个batch值
INITIAL_EPSILON = 0.1 # 刚开始随机的比例
FINAL_EPSILON = 0.0001 # 结束时随机的比例
OBSEVER = 1000  # 观察的次数
EXPLORE = 300000 # 进行探索的次数
PREPLAY_MEMORY = 50000 # D中保存图片的数量
FRAME_PER_ACTION = 1 # 每一次迭代的次数
GAMMA = 0.99  # 下一次奖励的衰减值
GAME ='bird' # 游戏的名字,用于存储save的名字


# 构造w权重参数,输入为shape,使用的是tf.truncated_normal
def weight_variable(shape):

    w = tf.truncated_normal(shape, stddev=0.1)
    return tf.Variable(w)
# 构造b权重参数,输入为shape, 使用的是tf.constant
def biase_variable(shape):

    b = tf.constant(0.0, shape=shape)
    return tf.Variable(b)
# 构造卷积层, 使用tf.nn.conv2d
def conv2d(x, w, stride):

    return tf.nn.conv2d(x, w, strides=[1, stride, stride, 1], padding='SAME')
# 构造池化层, 使用tf.nn.max_pool
def max_pool_2x2(x):

    return tf.nn.max_pool(x, strides=[1, 2, 2, 1], ksize=[1, 2, 2, 1], padding='SAME')

# 第一部分:用于构建网络,输出结果为当前位置两个方向的奖励值
def creatNetwork():
    # 第一步:构造第一层卷积层W的参数 [32]
    W_conv1 = weight_variable([8, 8, 4, 32])
    # 构造第一层卷积层b的参数 [32]
    b_conv1 = biase_variable([32])
    # 第二步:构造第二层卷积层w的参数, [4, 4, 32, 64]
    W_conv2 = weight_variable([4, 4, 32, 64])
    # 构造第二层卷积层b的参数 [64]
    b_conv2 = biase_variable([64])

    # 第三步:构造第三层卷积层w的参数  [3, 3, 64, 64]
    W_conv3 = weight_variable([3, 3, 64, 64])
    # 构造第三层卷积层b的参数 [64]
    b_conv3 = biase_variable([64])

    # 构造全连接的w参数[1600, 512]
    W_fc1 = weight_variable([1600, 512])
    # 构造全连接的b参数[512]
    b_fc1 = biase_variable([512])
    # 构造输出层的w参数[512, 2]
    W_out = weight_variable([512, ACTION])
    # 构造输出层的b参数[2]
    b_out = biase_variable([ACTION])

    # 第四步:构造输入数据的维度[None, 80, 80, 4]
    s = tf.placeholder('float32', [None, 80, 80, 4])
    # 第五步:第一层卷积, 使用tf.nn.relu() 和 pool层
    h_conv1 = tf.nn.relu(conv2d(s, W_conv1, 4) + b_conv1)
    h_pool1 = max_pool_2x2(h_conv1)

    # 第六步:第二层卷积, 使用tf.nn.relu() 构造第二层卷积
    h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2, 2) + b_conv2)

    # 第七步:第三层卷积, 使用tf.nn.relu() 构造第三层卷积
    h_conv3 = tf.nn.relu(conv2d(h_conv2, W_conv3, 1) + b_conv3)

    # 第八步: 进行维度的拉平操作
    fc_flatten = tf.reshape(h_conv3, [-1, 1600])

    # 第九步:进行第一层全连接,
    h_fc1 = tf.nn.relu(tf.matmul(fc_flatten, W_fc1) + b_fc1)

    # 第十步:构造输出层
    readout = tf.matmul(h_fc1, W_out) + b_out
    # 返回的是s, readout当前位置上下方向的奖励值, h_fc1表示前一层全连接操作
    return s, readout, h_fc1

# 第二步:用于构建训练的网络
def trainNetwork(sess, s, readout, h_fc1):

    # 第一步:构造损失函数cost和训练操作
    # 构建方向的输入值
    a = tf.placeholder('float32', [None, ACTION])
    # 构建当前位置奖励值
    y = tf.placeholder('float32', [None])
    # 使用位置信息与预测的奖励值进行相乘操作, 获得当前位置的预测奖励值
    ACTION_READOUT = tf.reduce_sum(tf.multiply(a, readout))
    # 使用预测奖励值与当前位置的奖励平方差来获得损失值
    cost = tf.reduce_mean(tf.square(y - ACTION_READOUT))
    # 使用tf.train.AdamOptimizer来构造损失值的降低函数
    train_op = tf.train.AdamOptimizer(1E-6).minimize(cost)
    # 进行参数的初始化操作
    sess.run(tf.global_variables_initializer())
    # 对游戏进行实例化操作
    game_state = game.GameState()
    # 第二步:构造第一个输入的样本,输入的数据的维度为[80, 80, 4]
    # 构造存储信息的列表
    D = deque()
    # 定义一个初始方向
    do_nothing = np.zeros([ACTION])
    # 使得初始方向的位置为do_noting[0] = 1
    do_nothing[0] = 1
    # 根据初始的方向获得第一个位置信息,x_t下一个图的位置, r_0下一个位置的及时奖励,terminal是否停止
    x_t, r_0, terminal = game_state.frame_step(do_nothing)
    # 对生成的图片使用cv2.resize进行维度的变化,将图片转换为灰度图
    x_t = cv2.cvtColor(cv2.resize(x_t, (80, 80)), cv2.COLOR_RGB2GRAY)
    # 进行二值化操作
    ret, x_t = cv2.threshold(x_t, 1, 255, cv2.THRESH_BINARY)
    # 使用np.stack进行图片的拼接操作, 将一帧的图片拼接为[80, 80, 4]
    s_t = np.stack((x_t, x_t, x_t, x_t), axis=2)
    # t为第一次迭代
    t = 0
    # 发现的随机值
    epsilon = INITIAL_EPSILON
    # 第三步:进入循环进行操作的训练
    while 'angry bird' != 'happy bird':
        # 将当前位置的图片输入,获得当前位置上下方向的奖励值
        readout_t = readout.eval(feed_dict={s:[s_t]})
        if t % FRAME_PER_ACTION == 0:
            # 将位置进行初始化
            a_t = np.zeros(ACTION)
            # 如果获得的随机值小于阈值,就进行探索操作
            if random.random() < epsilon:
                # 获得随机的一个方向
                index = random.randrange(ACTION)
                # 将方向对应的位置赋值为1
                a_t[index] = 1
            else:
                # 获得当前位置两个方向较大的索引值
                index = np.argmax(readout_t)
                # 将位置的索引赋值为1
                a_t[index] = 1
        else:
            a_t = do_nothing
        # 根据方向来获得下一个位置的图片,下一个位置的及时奖励,以及是否停止
        x_t1, r_1, terminal = game_state.frame_step(a_t)
        # 将图片转化为灰度值
        x_t1 = cv2.cvtColor(cv2.resize(x_t1, (80, 80)), cv2.COLOR_RGB2GRAY)
        # 将阈值大于1的设置为255,小于1的就是0不变,进行二值化操作
        ret, x_t1 = cv2.threshold(x_t1, 1, 255, cv2.THRESH_BINARY)
        # 将x_t1进行维度变化转换为(80, 80, 1)
        x_t1 = np.reshape(x_t1, (80, 80, 1))
        # 与测试图片的前3帧图片进行串接操作
        s_t1 = np.append(x_t1, s_t[:, :, :3], axis=2)
        # 将当前位置样本,当前位置,下一个位置的奖励值,下一个位置样本,是否停止加入到D中
        D.append((s_t, a_t, r_1, s_t1, terminal))
        # 如果D的长度大于memory,使用popleft去除最后一个信息
        if len(D) > PREPLAY_MEMORY:
            D.popleft()
        # 如果循环次数t大于观测值
        if t > OBSEVER:
            # 获得一个batch的图片信息
            mnistbatch = random.sample(D, BATCH_SIZE)
            # 获得当前样本的图片信息, [80, 80, 4]
            s_j_batch = [r[0] for r in mnistbatch]
            # 获得当前位置的方向信息,即向上的索引值为[1, 0], 向下的为[0, 1]
            a_batch = [r[1] for r in mnistbatch]
            # 下一个方向的及时奖励值
            r_batch = [r[2] for r in mnistbatch]
            # 获得下一个位置的组合图片信息 [80, 80, 4]
            s_j1_batch = [r[3] for r in mnistbatch]

            # 获得下一个方向的v奖励值
            readout_j1_batch = readout.eval(feed_dict={s:s_j1_batch})
            # 用于进行v奖励值的存储
            y_batch = []
            # 循环,计算每一个位置的奖励值v, 即r
            for i in range(len(mnistbatch)):
                # 获得是够已经停止
                terminal = mnistbatch[4][i]
                if terminal:
                    # 如果停止,当前的奖励值为及时奖励
                    y_batch.append(r_batch[i])

                else:
                    # 否者,当前位置的奖励值为及时奖励 + 奖励衰减 * 下一个位置的奖励值
                    y_batch.append(r_batch[i] + GAMMA * readout_j1_batch[i])

            # 进行缩小损失值的操作,输入为方向信息,y为当前位置的奖励值, s_j_batch表示当前位置的图片信息
            train_op.eval(feed_dict={
                a : a_batch,
                y: y_batch,
                s: s_j_batch
            })
            # 将下一个阶段的位置信息赋值给当前位置,用于进行迭代
            s_t = s_t1
            # 将循环的次数+1
            t = t + 1
            # 构造saver进行参数的保存
            saver = tf.train.Saver()

            if t % 1000 == 0:
                # 每迭代1000次,就进行参数的保存
                saver.save(sess, 'saved_network/' + GAME + '-dqn', global_step=t)
        # 获得当前的位置
        state = ''
        if t <= OBSEVER:
            state = 'observe'
        elif t > OBSEVER and t <= OBSEVER + EXPLORE:
            state = 'explore'

        else:
            state = 'train'
        # 降低探索的值,以保证探索的比例越来越小    
        if epsilon > FINAL_EPSILON and t > OBSEVER:
             epsilon -= (INITIAL_EPSILON - FINAL_EPSILON) / EXPLORE
        
        # 进行参数的打印    
        print('TIMESTEP', t, '/STATE', state, \
              '/ EPSILON', epsilon, '/ ACTION',
              a_t, '/ REWARD', r_1, \
              '/ Q_MAX %e' % np.max(readout_t))













def playGame():
    # 构造执行函数
    sess = tf.InteractiveSession()
    # 第一部分:卷积模型的构建,输出为s输入值, readout为当前的两个方向的奖励值,h_fc1第一层全连接的结果
    s, readout, h_fc1 = creatNetwork()
    trainNetwork(sess, s, readout, h_fc1)



def main():
    # 建立游戏
    playGame()

if __name__ == '__main__':

    main()

深度学习实践-强化学习-bird游戏 1.np.stack(表示进行拼接操作) 2.cv2.resize(进行图像的压缩操作) 3.cv2.cvtColor(进行图片颜色的转换) 4.cv2.threshold(进行图片的二值化操作) 5.random.sample(样本的随机抽取)

       效果图