深度学习实践-强化学习-bird游戏 1.np.stack(表示进行拼接操作) 2.cv2.resize(进行图像的压缩操作) 3.cv2.cvtColor(进行图片颜色的转换) 4.cv2.threshold(进行图片的二值化操作) 5.random.sample(样本的随机抽取)

时间:2023-03-09 06:55:24
深度学习实践-强化学习-bird游戏 1.np.stack(表示进行拼接操作) 2.cv2.resize(进行图像的压缩操作) 3.cv2.cvtColor(进行图片颜色的转换) 4.cv2.threshold(进行图片的二值化操作) 5.random.sample(样本的随机抽取)

1. np.stack((x_t, x_t, x_t, x_t), axis=2)  将图片进行串接的操作,使得图片的维度为[80, 80, 4]

参数说明: (x_t, x_t, x_t, x_t) 表示需要进行串接的图片, axis = 2 表示在第三个维度上进行串接操作

2. cv2.resize(x, [80, 80])  # 将图片的维度变化为80 * 80的维度

参数说明, x为输入的图片,80, 80表示图片变化的维度

3.cv2.cvtColor(x_t, tf.COLOR_RGB2GREY) 将图片转换为灰度图

参数说明, x_t表示输入的图片, tf.COLOR_RGB2GREY表示颜色转换的模式

4.cv2.threshold(x_t, 1, 255, cv2.THRESH_BINARY)  # 将大于等于1的索引变化为255,等于0的不变,也就是对图片进行二值化操作

参数说明: x_t表示输入图片,1表示阈值,255表示大于阈值后的数值, cv2.THRESH_BINARY表示进行二值化的模式

5. random.sample(D, batch_size)  从D样本中随机抽取batch_size个数据

参数说明:D表示样本,batch_size表示抽取样本的个数

因为对于每一帧图像而言,一只鸟在图像中的移动方向可能是向上的也可能是向下的

代码说明:使用的网络是卷积网络,

输入的是4帧的图片,即80*80*4,

预测结果为当前位置的上下两个方向的V值,即奖励值

损失值: 为当前真实的奖励值: cost = V[state] - (r_1 + GAMMA * V[next_state]), 目的是为了当前的预测奖励值 与 实际公式计算的奖励值越接近越好

V[state] 表示当前位置的预测奖励值

r_1 表示下一个状态的及时奖励

V[next_state] 表示下一个位置的预测奖励值

代码说明: 主要代码包括来部分,第一部分:主要包括构造网络结构,生成[None, 2]的输出结果,第二部分,获得一个batch的值来进行模型的训练

第一部分:构造网络模型, 用于进行v的预测

第一步:第一层卷积的W_conv1, 大小为[8, 8, 4, 32], 第一层卷积的b_conv1, 大小为[32]

第二步:第二层卷积的W_conv2, 大小为[4, 4, 32, 64], 第二层卷积的b_conv2, 大小为[64]

第三步:第三层卷积的W_conv3, 大小为[3, 3, 64, 64], 第三层卷积的b_conv3, 大小为[64]

第四步:使用tf.placeholder('float32', [None, 80, 80, 4])  # 初始化输入的数据s

第五步:使用tf.nn.relu() 构造第一层卷积层 步长为4,使用max_pool_2x2 进行池化操作

第六步:使用tf.nn.relu() 构造第二层卷积层 步长为2

第七步:使用tf.nn.relu(conv2d(x, W_conv3, 1) + b) 步长为1 来构造第三层卷积层

第八步:使用tf.reshape(h_conv4, [-1, 1600])  # 将卷积后的结果进行拉平操作

第九步:使用tf.nn.relu(tf.matmul(x,  w_fc1) + b)  # 进行第一次的全连接操作

第十步:使用tf.matmul(x, w_out) + b # 进行输出层的全连接操作,输出的结果为[None, action], 返货的结果为s,readout, h_fc1, s为输入结果,readout为输出结果,h_fc1为第一层全连接的输出结果

第二部分:将上述求得的s,readout, h_fc1, sess, 传入,这个先观察1000次,1000以后再进行模型参数的训练,同时这里使用一个epsilon, 当random.random() 小于epsilon,那么下一个动作的位置方向是随机值,否者的话就使用np.argmax(readout), 即readout预测出当前位置v值大的索引值作为方向

第一步:定义网络的损失值和训练步骤

第一步:使用tf.placeholder('float32', [None, 2]) 用来构造a即方向

第二步: 使用tf.placeholder('float32', [None])  用来构造实际当前位置的V值

第三步:预测结果: action_readout = tf.reduce_sum(tf.multiply(readout, a))  来获得当前预测的结果v奖励值

第四步: 构造cost函数,使用tf.reduce_mean(tf.square(action_readout - y))

第五步:构造train_op, 使用tf.trian.Adaoptimer(1e-6).minimize(cost)

第六步:使用sess.run(tf.global_variable_initial())  进行参数的初始化操作

第二步:构造第一个输出的参数, 大小为[80, 80, 4]

第一步:使用game_state = game.GameState() 来实例化game_state的向量

第二步:使用D = deque() 来构造存储的向量,用于存储st 当前位置, a_t下一个动作的方向,  r_t,下一个动作的及时奖励,以及st1,下一个位置

第三步: 构造一个初始化方向, do_nothing

第一步: np.zeros([2]) 来构造do_nothing的初始值

第二步:do_nothing[0] = 1 将do_nothing的第一个位置的索引值构造为1

第四步:将do_nothing 输入到game_state.frame_step(do_nothing) 获得do_nothing 下一个位置的x_t当前图片, r_0为及时奖励,  terminal是否停止

第五步:使用cv2.cvtColor(cv2.resize(x_t, (80, 80), cv2.COLOR_RGB2GRAY)) 将图片进行维度变换,同时将其转换为灰度图

第六步: 使用cv2.threshold(x_t, 1, 255, cv2.THRESH_BINARY) # 进行二值化操作

第七步:使用np.stack((x_t, x_t, x_t, x_t), axis=2) # 将此时的四张图片做一个串接,作为第一个输入s_t

第三步:进入循环,用于进行参数的训练, t = 0

第一步:使用readout.eval(feed_dict={s:s_t}) 来获得当前位置的两个方向的v值

第二步:根据随机数和np.argmax() 来获得方向值

第一步:使用np.zeros(2) 来获得a_t的初始值

第二步: 使用random.random() 来获得随机值,如果随机值小于epsilon,使用random.ranrange(ACTION) 来获得一个0,1的随机值

第三步:a_t[index] = 1, 来构造a_t

第四步: 如果随机值大于epsilon, 使用np.argmax(t_readout) 来获得方向较大的预测值v,来作为索引值

第五步: a_t[index] = 1 来构造a_t

第三步: 使用game_state.frame_step(a_t)  来获得x_t1的图片,r_1的及时奖励, terminal判断是否停止了, 将其与s_t的前三帧图片进行拼接

第一步:使用game_state.frame_step(a_t) 来获得x_t1, r_1, terminal

第二步:使用cv2.cvtColor(cv2.resize(x_t1, (80, 80)), cv2.COLOR_RGB2GRAY) 将彩图转换为灰度图

第三步:使用cv2.threshold(x_t, 1, 255, cv2.THRESH_BINARY) 进行二值化操作

第四步:使用np.reshape(x_t, [80, 80, 1]) 将图片转换为[80, 80, 1] 的数据

第五步:使用np.append(x_t, s_t[:, :, 3], axis=2) 将图片进行拼接

第四步: 将s_t, a_t, r_1. s_t1 存储在D里面, s_t当前位置,a_t下一个方向,r_1下一个方向的奖励值,s_t1下一个位置

第一步:D.append((s_t, a_t, r_1, s_t1))

第二步:判断len(D) 大于MERORY, 使用D.popleft() 去除最后一张图片,使其保持5000的个数

第五步:如果t > OBSERVE, 使用random.sample(D, BATCH_SIZE) 从D中所及获取batch_size章图片进行模型的训练,sess.run(train_op)

第一步:ministbatch = random.sample(D, BATCH_SIZE) 从D中获得batch_size的信息

第二步:s_j_batch = [r[0] for r in mnistbatch] 获得当前位置, a_batch 下一个动作的方向, r_batch下一个动作的及时奖励,s_j1_batch 获得下一个位置的图片

第三步:使用readou_batch = readout.eval(feed_dict={s:s_j1_batch}) 来获得下一个位置的v奖励值,

第四步:构造y_batch,用于存储当前位置的v奖励值

第一步:循环len(ministbatch)

第二步:terminal = ministbatch[i][4] 来获得此时的terminal

第三步:if terminal 表示停止了,那么y_batch.append(r_batch[i])  # 当前的预测奖励值使用r_batch[i]

第四步:如果没有停止,那么y_batch.append(r_batch[i] + GAMMA*readout_batch[i]) # 当前的预测奖励值使用r_batch[j]  + readout_batch[i] 加上的是下一个位置的奖励值

第六步: 使用sess.run(train_op, feed_dict={s: s_j_batch, a:a_bacth, y:y_batch}) ,s_j_batch表示当前位置,a_batch表示一个动作的方向,y_batch,表示当前位置的实际奖励值

第七步:s_t = s_t1, 将下一个动作的图片赋值给当前图片,t = t + 1 更新迭代次数

第八步:如果迭代次数为10000, 使用saver.save(global_step=t) 将模型进行保存

第九步:使用epislon -= (initial - final) /  进行episilon的更新操作

第十步:打印各个参数信息

train.py

import tensorflow as tf
import numpy as np
import sys
sys.path.append('game/')
import wrapped_flappy_bird as game
import random
from collections import deque
import cv2 ACTION = 2 # 方向的种类
BATCH_SIZE = 64 # 每一个batch值
INITIAL_EPSILON = 0.1 # 刚开始随机的比例
FINAL_EPSILON = 0.0001 # 结束时随机的比例
OBSEVER = 1000 # 观察的次数
EXPLORE = 300000 # 进行探索的次数
PREPLAY_MEMORY = 50000 # D中保存图片的数量
FRAME_PER_ACTION = 1 # 每一次迭代的次数
GAMMA = 0.99 # 下一次奖励的衰减值
GAME ='bird' # 游戏的名字,用于存储save的名字 # 构造w权重参数,输入为shape,使用的是tf.truncated_normal
def weight_variable(shape): w = tf.truncated_normal(shape, stddev=0.1)
return tf.Variable(w)
# 构造b权重参数,输入为shape, 使用的是tf.constant
def biase_variable(shape): b = tf.constant(0.0, shape=shape)
return tf.Variable(b)
# 构造卷积层, 使用tf.nn.conv2d
def conv2d(x, w, stride): return tf.nn.conv2d(x, w, strides=[1, stride, stride, 1], padding='SAME')
# 构造池化层, 使用tf.nn.max_pool
def max_pool_2x2(x): return tf.nn.max_pool(x, strides=[1, 2, 2, 1], ksize=[1, 2, 2, 1], padding='SAME') # 第一部分:用于构建网络,输出结果为当前位置两个方向的奖励值
def creatNetwork():
# 第一步:构造第一层卷积层W的参数 [32]
W_conv1 = weight_variable([8, 8, 4, 32])
# 构造第一层卷积层b的参数 [32]
b_conv1 = biase_variable([32])
# 第二步:构造第二层卷积层w的参数, [4, 4, 32, 64]
W_conv2 = weight_variable([4, 4, 32, 64])
# 构造第二层卷积层b的参数 [64]
b_conv2 = biase_variable([64]) # 第三步:构造第三层卷积层w的参数 [3, 3, 64, 64]
W_conv3 = weight_variable([3, 3, 64, 64])
# 构造第三层卷积层b的参数 [64]
b_conv3 = biase_variable([64]) # 构造全连接的w参数[1600, 512]
W_fc1 = weight_variable([1600, 512])
# 构造全连接的b参数[512]
b_fc1 = biase_variable([512])
# 构造输出层的w参数[512, 2]
W_out = weight_variable([512, ACTION])
# 构造输出层的b参数[2]
b_out = biase_variable([ACTION]) # 第四步:构造输入数据的维度[None, 80, 80, 4]
s = tf.placeholder('float32', [None, 80, 80, 4])
# 第五步:第一层卷积, 使用tf.nn.relu() 和 pool层
h_conv1 = tf.nn.relu(conv2d(s, W_conv1, 4) + b_conv1)
h_pool1 = max_pool_2x2(h_conv1) # 第六步:第二层卷积, 使用tf.nn.relu() 构造第二层卷积
h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2, 2) + b_conv2) # 第七步:第三层卷积, 使用tf.nn.relu() 构造第三层卷积
h_conv3 = tf.nn.relu(conv2d(h_conv2, W_conv3, 1) + b_conv3) # 第八步: 进行维度的拉平操作
fc_flatten = tf.reshape(h_conv3, [-1, 1600]) # 第九步:进行第一层全连接,
h_fc1 = tf.nn.relu(tf.matmul(fc_flatten, W_fc1) + b_fc1) # 第十步:构造输出层
readout = tf.matmul(h_fc1, W_out) + b_out
# 返回的是s, readout当前位置上下方向的奖励值, h_fc1表示前一层全连接操作
return s, readout, h_fc1 # 第二步:用于构建训练的网络
def trainNetwork(sess, s, readout, h_fc1): # 第一步:构造损失函数cost和训练操作
# 构建方向的输入值
a = tf.placeholder('float32', [None, ACTION])
# 构建当前位置奖励值
y = tf.placeholder('float32', [None])
# 使用位置信息与预测的奖励值进行相乘操作, 获得当前位置的预测奖励值
ACTION_READOUT = tf.reduce_sum(tf.multiply(a, readout))
# 使用预测奖励值与当前位置的奖励平方差来获得损失值
cost = tf.reduce_mean(tf.square(y - ACTION_READOUT))
# 使用tf.train.AdamOptimizer来构造损失值的降低函数
train_op = tf.train.AdamOptimizer(1E-6).minimize(cost)
# 进行参数的初始化操作
sess.run(tf.global_variables_initializer())
# 对游戏进行实例化操作
game_state = game.GameState()
# 第二步:构造第一个输入的样本,输入的数据的维度为[80, 80, 4]
# 构造存储信息的列表
D = deque()
# 定义一个初始方向
do_nothing = np.zeros([ACTION])
# 使得初始方向的位置为do_noting[0] = 1
do_nothing[0] = 1
# 根据初始的方向获得第一个位置信息,x_t下一个图的位置, r_0下一个位置的及时奖励,terminal是否停止
x_t, r_0, terminal = game_state.frame_step(do_nothing)
# 对生成的图片使用cv2.resize进行维度的变化,将图片转换为灰度图
x_t = cv2.cvtColor(cv2.resize(x_t, (80, 80)), cv2.COLOR_RGB2GRAY)
# 进行二值化操作
ret, x_t = cv2.threshold(x_t, 1, 255, cv2.THRESH_BINARY)
# 使用np.stack进行图片的拼接操作, 将一帧的图片拼接为[80, 80, 4]
s_t = np.stack((x_t, x_t, x_t, x_t), axis=2)
# t为第一次迭代
t = 0
# 发现的随机值
epsilon = INITIAL_EPSILON
# 第三步:进入循环进行操作的训练
while 'angry bird' != 'happy bird':
# 将当前位置的图片输入,获得当前位置上下方向的奖励值
readout_t = readout.eval(feed_dict={s:[s_t]})
if t % FRAME_PER_ACTION == 0:
# 将位置进行初始化
a_t = np.zeros(ACTION)
# 如果获得的随机值小于阈值,就进行探索操作
if random.random() < epsilon:
# 获得随机的一个方向
index = random.randrange(ACTION)
# 将方向对应的位置赋值为1
a_t[index] = 1
else:
# 获得当前位置两个方向较大的索引值
index = np.argmax(readout_t)
# 将位置的索引赋值为1
a_t[index] = 1
else:
a_t = do_nothing
# 根据方向来获得下一个位置的图片,下一个位置的及时奖励,以及是否停止
x_t1, r_1, terminal = game_state.frame_step(a_t)
# 将图片转化为灰度值
x_t1 = cv2.cvtColor(cv2.resize(x_t1, (80, 80)), cv2.COLOR_RGB2GRAY)
# 将阈值大于1的设置为255,小于1的就是0不变,进行二值化操作
ret, x_t1 = cv2.threshold(x_t1, 1, 255, cv2.THRESH_BINARY)
# 将x_t1进行维度变化转换为(80, 80, 1)
x_t1 = np.reshape(x_t1, (80, 80, 1))
# 与测试图片的前3帧图片进行串接操作
s_t1 = np.append(x_t1, s_t[:, :, :3], axis=2)
# 将当前位置样本,当前位置,下一个位置的奖励值,下一个位置样本,是否停止加入到D中
D.append((s_t, a_t, r_1, s_t1, terminal))
# 如果D的长度大于memory,使用popleft去除最后一个信息
if len(D) > PREPLAY_MEMORY:
D.popleft()
# 如果循环次数t大于观测值
if t > OBSEVER:
# 获得一个batch的图片信息
mnistbatch = random.sample(D, BATCH_SIZE)
# 获得当前样本的图片信息, [80, 80, 4]
s_j_batch = [r[0] for r in mnistbatch]
# 获得当前位置的方向信息,即向上的索引值为[1, 0], 向下的为[0, 1]
a_batch = [r[1] for r in mnistbatch]
# 下一个方向的及时奖励值
r_batch = [r[2] for r in mnistbatch]
# 获得下一个位置的组合图片信息 [80, 80, 4]
s_j1_batch = [r[3] for r in mnistbatch] # 获得下一个方向的v奖励值
readout_j1_batch = readout.eval(feed_dict={s:s_j1_batch})
# 用于进行v奖励值的存储
y_batch = []
# 循环,计算每一个位置的奖励值v, 即r
for i in range(len(mnistbatch)):
# 获得是够已经停止
terminal = mnistbatch[4][i]
if terminal:
# 如果停止,当前的奖励值为及时奖励
y_batch.append(r_batch[i]) else:
# 否者,当前位置的奖励值为及时奖励 + 奖励衰减 * 下一个位置的奖励值
y_batch.append(r_batch[i] + GAMMA * readout_j1_batch[i]) # 进行缩小损失值的操作,输入为方向信息,y为当前位置的奖励值, s_j_batch表示当前位置的图片信息
train_op.eval(feed_dict={
a : a_batch,
y: y_batch,
s: s_j_batch
})
# 将下一个阶段的位置信息赋值给当前位置,用于进行迭代
s_t = s_t1
# 将循环的次数+1
t = t + 1
# 构造saver进行参数的保存
saver = tf.train.Saver() if t % 1000 == 0:
# 每迭代1000次,就进行参数的保存
saver.save(sess, 'saved_network/' + GAME + '-dqn', global_step=t)
# 获得当前的位置
state = ''
if t <= OBSEVER:
state = 'observe'
elif t > OBSEVER and t <= OBSEVER + EXPLORE:
state = 'explore' else:
state = 'train'
# 降低探索的值,以保证探索的比例越来越小
if epsilon > FINAL_EPSILON and t > OBSEVER:
epsilon -= (INITIAL_EPSILON - FINAL_EPSILON) / EXPLORE # 进行参数的打印
print('TIMESTEP', t, '/STATE', state, \
'/ EPSILON', epsilon, '/ ACTION',
a_t, '/ REWARD', r_1, \
'/ Q_MAX %e' % np.max(readout_t)) def playGame():
# 构造执行函数
sess = tf.InteractiveSession()
# 第一部分:卷积模型的构建,输出为s输入值, readout为当前的两个方向的奖励值,h_fc1第一层全连接的结果
s, readout, h_fc1 = creatNetwork()
trainNetwork(sess, s, readout, h_fc1) def main():
# 建立游戏
playGame() if __name__ == '__main__': main()

深度学习实践-强化学习-bird游戏 1.np.stack(表示进行拼接操作) 2.cv2.resize(进行图像的压缩操作) 3.cv2.cvtColor(进行图片颜色的转换) 4.cv2.threshold(进行图片的二值化操作) 5.random.sample(样本的随机抽取)

效果图