python_强化学习算法DQN_玩五子棋游戏
发布日期:2021-06-29 19:49:33 浏览次数:3 分类:技术文章

本文共 17285 字,大约阅读时间需要 57 分钟。

本文公开一个基于强化学习算法DQN的五子棋游戏自动下棋算法源码,并对思路进行讲解。

完整代码和预训练模型(Saver文件夹)地址:

一个基于CNN构成的DQN算法的8*8的五子棋游戏

在这里插入图片描述

1、Q-Learning介绍

Q-Learning的思想并不是很复杂,很多文章都有详细的介绍,这里只是简单举个例子,不做详细讲解。

2、DQN介绍

DQN也叫deepQ-Learning,在Q-Learning前面加一个Deep。Q-Learning有一个缺点,如果状态特别多,比如五子棋的棋盘,每个位置都有(空白、黑子、白子)三个状态,那么假如一个10*10的棋盘 就有3^100个状态,那么这个Q表是没办法做出来的。那么我们就没办法构建这个Q表来获取状态价值状态转移价值了。

DQN就是搭建一个人工神经网络,输入是当前状态,输出是状态转移价值。或者输入是当前状态,输出是当前状态的Q值。通过多次迭代训练,使得神经网络输出逼近真实的Q值(逼近而不是等于,因为毕竟是神经网络,参数数量,存储占用量远小于Q表,如果能做到完全等于的话,还要存储干啥)

那么神经网络的训练的损失就是 预测Q值和(max(下一步的真实Q值)乘系数 +奖励值)的差的平方。 预测Q值就是神经网络一次前向传播输出的Q值,真实Q值就是神经网络曾经预测的Q值。为什么真实Q值是神经网络曾经预测过的Q值呢?因为神经网络每次训练都会对输出值产生影响,如果这个真实Q值一直变化的话,那么神经网络是没办法收敛的。所以需要搭建另一个参数一模一样的神经网络来生成真实Q值。这个生成真实Q值的网络不需要训练,只需要迭代一定次数以后,复制一份预测网络的参数即可。就好比一个笨老师教一个学生,学生学会了以后当了老师,教新的学生,然后青出于蓝而胜于蓝,这个学生越来越强。

本文中代码用的方法是,保存历史预测的Q值,等一个棋局结束后,再用这些Q值来训练每一步的预测Q值,这样做到一个神经网络就可以了。相当于一个聪明的学生,不停的复习,归纳,总结,然后逐渐变强。

3、对抗算法介绍

根据上面介绍的Q-Learning算法,解决的是一个单智能体的问题,这个智能体如何能够用最小的代价获得最大的回报。但是对弈的学习过程不一样,博弈中存在两个智能体,当前状态和当前动作对应的下一个状态会有很多,因为对手怎么下子我们不知道。那么当前状态和当前动作对应的什么状态是固定的呢?对手的状态。那么我能不能预测一下对手下一步能达到的最大的Q值呢?对手的Q值和我的Q值又有什么关系呢?对于零和博弈,对手的优势就是我得劣势,对手的劣势就是我的优势,那么我就可以用对手的Q值乘一个负的系数来训练当前的Q值。这样就解决了。

训练的过程就是,先自己和自己下一局棋,并记录每一步和每一步预测的最大Q值。等棋局结束后,再把整个棋局用神经网络"回顾"一遍,用记录的步子,Q值训练。

4、训练过程中注意的地方

下子的时候按照常理,咱们都是选择Q值最大的动作来下子,这样下子是没问题的,但是我们是来训练网络的,如果每次选择最大的步子下子的话容易陷入一个僵局。获胜方一直用同样或相似的套路打败败方,神经网络很快损失下降很快,但是还是不会正确的落子,或者说它只对某一种棋局局面的风格掌握得很好,对不按照套路出牌的人就没办法应对。那么我们就要加一个随即事件,一部分步子是按照最大值去走的,一部分步子是随机走的,但是最大Q值是每次都要计算出来保存用于回顾训练用的。

不同的棋子最好放在不同的channel里面,我发现如果用0背景1白棋2黑棋这样标注放到一个棋盘里面神经网络无法收敛

5、完整代码

运行代码入口如下,已经写了很详细的注释了,我很辛苦的

如果需要预训练模型需要从文章开始的链接下载Saver文件夹

import numpy as npimport randomimport osimport tensorflow.compat.v1 as tftf.disable_v2_behavior()from DQN_point_game import Map'''此文件主要用于实现强化学习算法DQN玩五子棋'''class DQN():    def __init__(self):        self.n_input = Map.mapsize * Map.mapsize        self.n_output = 1        self.current_q_step = 0        self.avg_loss = 0        # placeholder是在神经网络构建graph的时候在模型中的占位,此时并没有把要输入的数据传入模型,它只会分配必要的内存。        # 建立完session后,在会话中,运行模型的时候通过feed_dict()函数向占位符喂入数据。        self.x = tf.placeholder("float", [None, Map.mapsize, Map.mapsize], name='x')        self.y = tf.placeholder("float", [None, self.n_output], name='y')        self.create_Q_network()        self.create_training_method()        self.saver = tf.train.Saver()        self.sess = tf.Session()        # 它能让你在运行图的时候,插入一些计算图        self.sess = tf.InteractiveSession()        self.sess.run(tf.global_variables_initializer())    def create_Q_network(self):        # tf.random_normal()函数用于从“服从指定正态分布的序列”中随机取出指定个数的值。  stddev: 正态分布的标准差        wc1 = tf.Variable(tf.random_normal([3, 3, 1, 64], stddev=0.1), dtype=tf.float32, name='wc1')        wc2 = tf.Variable(tf.random_normal([3, 3, 64, 128], stddev=0.1), dtype=tf.float32, name='wc2')        wc3 = tf.Variable(tf.random_normal([3, 3, 128, 256], stddev=0.1), dtype=tf.float32, name='wc3')        wd1 = tf.Variable(tf.random_normal([256, 128], stddev=0.1), dtype=tf.float32, name='wd1')        wd2 = tf.Variable(tf.random_normal([128, self.n_output], stddev=0.1), dtype=tf.float32, name='wd2')        # tf.Variable 得到的是张量,而张量并不是具体的值,而是计算过程        bc1 = tf.Variable(tf.random_normal([64], stddev=0.1), dtype=tf.float32, name='bc1')        bc2 = tf.Variable(tf.random_normal([128], stddev=0.1), dtype=tf.float32, name='bc2')        bc3 = tf.Variable(tf.random_normal([256], stddev=0.1), dtype=tf.float32, name='bc3')        bd1 = tf.Variable(tf.random_normal([128], stddev=0.1), dtype=tf.float32, name='bd1')        bd2 = tf.Variable(tf.random_normal([self.n_output], stddev=0.1), dtype=tf.float32, name='bd2')        weights = {
'wc1': wc1, 'wc2': wc2, 'wc3': wc3, 'wd1': wd1, 'wd2': wd2 } biases = {
'bc1': bc1, 'bc2': bc2, 'bc3': bc3, 'bd1': bd1, 'bd2': bd2 } self.Q_value = self.conv_basic(self.x, weights, biases) self.Q_Weihgts = [weights, biases] def conv_basic(self, _input, _w, _b): # input _out = tf.reshape(_input, shape=[-1, Map.mapsize, Map.mapsize, 1]) # conv layer 1 conv2d 用于做二维卷积 strides, # 步长参数 padding, # 卷积方式 _out = tf.nn.conv2d(_out, _w['wc1'], strides=[1, 1, 1, 1], padding='SAME') # bias_add 一个叫bias的向量加到一个叫value的矩阵上,是向量与矩阵的每一行进行相加 _out = tf.nn.relu(tf.nn.bias_add(_out, _b['bc1'])) # ksize 池化窗口的大小,取一个四维向量 padding: 填充的方法,SAME或VALID,SAME表示添加全0填充,VALID表示不添加 _out = tf.nn.max_pool(_out, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME') # conv layer2 _out = tf.nn.conv2d(_out, _w['wc2'], strides=[1, 1, 1, 1], padding='SAME') _out = tf.nn.relu(tf.nn.bias_add(_out, _b['bc2'])) _out = tf.nn.max_pool(_out, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME') # conv layer3 _out = tf.nn.conv2d(_out, _w['wc3'], strides=[1, 1, 1, 1], padding='SAME') _out = tf.nn.relu(tf.nn.bias_add(_out, _b['bc3'])) # 计算张量tensor沿着指定的数轴(tensor的某一维度)上的的平均值,主要用作降维或者计算tensor(图像)的平均值。 _out = tf.reduce_mean(_out, [1, 2]) # fully connected layer1 matmul 两个矩阵中对应元素各自相乘 _out = tf.nn.relu(tf.add(tf.matmul(_out, _w['wd1']), _b['bd1'])) # fully connected layer2 _out = tf.add(tf.matmul(_out, _w['wd2']), _b['bd2']) return _out def create_training_method(self): # squared_difference 计算张量 x、y 对应元素差平方 self.cost = tf.reduce_mean(tf.squared_difference(self.Q_value, self.y)) self.optm = tf.train.AdamOptimizer(learning_rate=0.001, name='Adam').minimize(self.cost) def restore(self): if os.path.exists('Saver/cnnsaver.ckpt-0.index'): self.saver.restore(self.sess, os.path.abspath('Saver/cnnsaver.ckpt-0')) def computerPlay(self, IsTurnWhite): if IsTurnWhite: print('白旗走') # 如果该白旗走的话 用黑的棋盘,1代表黑,-1代表白 board = np.array(Map.blackBoard) else: print('黑旗走') # 如果该黑旗走的话 用白的棋盘 1代表白,-1代表黑 board = np.array(Map.whiteBoard) # 建立所有可下位置的数组,每下一个位置一个数组 boards = [] # 当前棋谱中空白的地方 positions = [] for i in range(Map.mapsize): for j in range(Map.mapsize): # 如果这个当前棋谱这个位置是空白的 if board[j][i] == Map.backcode: predx = np.copy(board) # -1代表自己,更方便计算 predx[j][i] = -1 boards.append(predx) positions.append([i, j]) if len(positions) == 0: return 0, 0, 0 # 计算所有可下的位置的价值 nextStep = self.sess.run(self.Q_value, feed_dict={
self.x: boards}) maxx = 0 maxy = 0 maxValue = -1000 # 实际最大价值 用于后续学习 # 从所有可下的地方找一个价值最大的位置下棋 for i in range(len(positions)): value = nextStep[i] + random.randint(0, 10) / 1000 # 如果没有最优步子 则随机选择一步 if value > maxValue: maxValue = value maxx = positions[i][0] maxy = positions[i][1] print(str(maxx) + ',' + str(maxy)) print('此位置的价值为:' + str(maxValue[0])) return maxx, maxy, maxValue # 下完了一局就更新一下AI模型 def TrainOnce(self, winner): # 记录棋图 # board1 白棋 board2 黑棋 board1 = np.array(Map.mapRecords1) board2 = np.array(Map.mapRecords2) # 记录棋步 step1 = np.array(Map.stepRecords1) step2 = np.array(Map.stepRecords2) # 记录得分 scoreR1 = np.array(Map.scoreRecords1) scoreR2 = np.array(Map.scoreRecords2) board1 = np.reshape(board1, [-1, Map.mapsize, Map.mapsize]) board2 = np.reshape(board2, [-1, Map.mapsize, Map.mapsize]) step1 = np.reshape(step1, [-1, Map.mapsize, Map.mapsize]) step2 = np.reshape(step2, [-1, Map.mapsize, Map.mapsize]) score1 = [] score2 = [] board1 = (board1 * (1 - step1)) + step1 * Map.blackcode board2 = (board2 * (1 - step2)) + step2 * Map.blackcode # 每步的价值 = 奖励(胜1 负-0.9) + 对方棋盘能达到的最大价值(max taget Q) * (-0.9) for i in range(len(board1)): if i == len(scoreR2): # 白方已经五连 白方赢 print('白方已经五连,白方赢') score1.append([1.0]) # 白方的最后一步获得1分奖励 else: # 白方的价值为:黑方棋盘能达到的最大价值(max taget Q) * (-0.9) score1.append([scoreR2[i][0] * -0.9]) if winner == 2: print('惩罚白方的最后一步,将其价值设为 -0.9') score1[len(score1) - 1][0] = -0.9 # 1 白棋 2 黑棋 for i in range(len(board2)): if i == len(scoreR1) - 1: # 黑方赢 print('黑方已经五连,黑方赢') score2.append([1.0]) else: # 黑棋的得分为:白方棋盘能达到的最大价值(max taget Q) * (-0.9) score2.append([scoreR1[i + 1][0] * -0.9]) if winner == 1: print('惩罚黑方的最后一步,将其价值设为 -0.9') # 惩罚黑方的最后一步 score2[len(score2) - 1][0] = -0.9 # 一次完成多个数组的拼接 borders = np.concatenate([board1, board2], axis=0) scores = np.concatenate([score1, score2], axis=0) _, totalLoss = self.sess.run([self.optm, self.cost], feed_dict={
self.x: borders, self.y: scores}) self.avg_loss += totalLoss print('train avg loss ' + str(self.avg_loss)) self.avg_loss = 0 # os.path.abspath取决于os.getcwd,如果是一个绝对路径,就返回, # 如果不是绝对路径,根据编码执行getcwd/getcwdu.然后把path和当前工作路径连接起来 self.saver.save(self.sess, os.path.abspath('Saver/cnnsaver.ckpt'), global_step=0) def PlayWidthHuman(self): # 读取历史存储的模型 self.restore() Map.PlayWithComputer = self.computerPlay Map.TrainNet = self.TrainOnce Map.ShowWind()if __name__ == '__main__': dqn = DQN() dqn.PlayWidthHuman()

用于构建棋谱的代码

Map.py

import tkinter as tkimport osimport timeimport copy# 定义窗口top = tk.Tk()top.title("AI自动玩五子棋")top.geometry('400x300')# 定义地图尺寸mapsize = 8# 元素尺寸pixsize = 20# 连子个数winSet = 5# 空白编号backcode = 0# 白棋whitecode = 1# 黑棋blackcode = -1# 定义画布canvas = tk.Canvas(top, height=mapsize * pixsize, width=mapsize * pixsize,                   bg="gray")canvas.pack(pady=25)for i in range(mapsize):    canvas.create_line(i * pixsize, 0,                       i * pixsize, mapsize * pixsize,                       fill='black')    canvas.create_line(0, i * pixsize,                       mapsize * pixsize, i * pixsize,                       fill='black')# 初始棋盘whiteBoard = []stepBoard = []for i in range(mapsize):    row = []    rowBak = []    for j in range(mapsize):        row.append(0)        rowBak.append(backcode)    whiteBoard.append(rowBak)    stepBoard.append(row)blackBoard = copy.deepcopy(whiteBoard)# 棋子列表childMap = []# 记录棋图mapRecords1 = []mapRecords2 = []# 记录棋步stepRecords1 = []stepRecords2 = []# 记录得分scoreRecords1 = []scoreRecords2 = []isGameOver = FalseIsTurnWhite = Truedef Restart():    global isGameOver    global IsTurnWhite    for child in childMap:        canvas.delete(child)    childMap.clear()    isGameOver = False    IsTurnWhite = True    mapRecords1.clear()    mapRecords2.clear()    stepRecords1.clear()    stepRecords2.clear()    scoreRecords1.clear()    scoreRecords2.clear()    for i in range(mapsize):        for j in range(mapsize):            whiteBoard[j][i] = backcode            blackBoard[j][i] = backcodeWinDataSetPath = 'DataSets\\win'LosDataSetPath = 'DataSets\\los'TrainNet = Nonedef SaveDataSet(tag):    if TrainNet != None:        TrainNet(tag)    else:        winfilename = WinDataSetPath + '\\' + time.strftime("%Y%m%d%H%M%S", time.localtime()) + '.txt'        losfilename = LosDataSetPath + '\\' + time.strftime("%Y%m%d%H%M%S", time.localtime()) + '.txt'        if not os.path.exists('DataSets'):            os.mkdir('DataSets')        if not os.path.exists(WinDataSetPath):            os.mkdir(WinDataSetPath)        if not os.path.exists(LosDataSetPath):            os.mkdir(LosDataSetPath)        strInfo1 = ''        for i in range(len(mapRecords1)):            for j in range(mapsize):                for k in range(mapsize):                    strInfo1 += str(mapRecords1[i][j][k]) + ','            strInfo1 += '\n'            for j in range(mapsize):                for k in range(mapsize):                    strInfo1 += str(stepRecords1[i][j][k]) + ','            strInfo1 += '\n'        strInfo2 = ''        for i in range(len(mapRecords2)):            for j in range(mapsize):                for k in range(mapsize):                    strInfo2 += str(mapRecords2[i][j][k]) + ','            strInfo2 += '\n'            for j in range(mapsize):                for k in range(mapsize):                    strInfo2 += str(stepRecords2[i][j][k]) + ','            strInfo2 += '\n'        if tag == 1:            with open(winfilename, "w") as f:                f.write(strInfo1)            with open(losfilename, "w") as f:                f.write(strInfo2)        else:            with open(winfilename, "w") as f:                f.write(strInfo2)            with open(losfilename, "w") as f:                f.write(strInfo1)def JudgementResult():    global isGameOver    judgemap = whiteBoard    for i in range(mapsize):        for j in range(mapsize):            if judgemap[j][i] != backcode:                tag = judgemap[j][i]                checkrow = True                checkCol = True                checkLine = True                checkLine2 = True                for k in range(winSet - 1):                    if i + k + 1 < mapsize:  # 行                        if (judgemap[j][i + k + 1] != tag) and checkrow:                            checkrow = False                        if j + k + 1 < mapsize:  # 斜线                            if (judgemap[j + k + 1][i + k + 1] != tag) and checkLine:                                checkLine = False                        else:                            checkLine = False                    else:                        checkrow = False                        checkLine = False                    if j + k + 1 < mapsize:  # 列                        if (judgemap[j + k + 1][i] != tag) and checkCol:                            checkCol = False                        if i - k - 1 >= 0:  # 斜线                            if (judgemap[j + k + 1][i - k - 1] != tag) and checkLine2:                                checkLine2 = False                        else:                            checkLine2 = False                    else:                        checkCol = False                        checkLine2 = False                    if not checkrow and not checkCol and not checkLine and not checkLine2:                        break                if checkrow or checkCol or checkLine or checkLine2:                    isGameOver = True                    SaveDataSet(tag)                    return tag    return 0PlayWithComputer = NoneGetMaxScore = Nonedef playChess(event):    if isGameOver:        print('game is over, restart!')        Restart()        return    x = event.x // pixsize    y = event.y // pixsize    if x >= mapsize or y >= mapsize:        return    if whiteBoard[y][x] != backcode:        return    score = 0    if PlayWithComputer != None:        _x, _y, score = PlayWithComputer(IsTurnWhite)    res = chess(x, y, score)    if res == 0:        if PlayWithComputer != None:            x, y, score = PlayWithComputer(IsTurnWhite)            res = chess(x, y, score)def chess(x, y, score):    global IsTurnWhite    if isGameOver:        print('game is over, restart!')        Restart()        return -1    if whiteBoard[y][x] != backcode:        print('game is over, restart!')        Restart()        return -1    step = copy.deepcopy(stepBoard)    step[y][x] = 1    if IsTurnWhite:  # 白棋是人工走的 如果过用来当训练集 用反转棋盘        mapRecords1.append(copy.deepcopy(blackBoard))        stepRecords1.append(step)        scoreRecords1.append(score)        whiteBoard[y][x] = whitecode  # 1白 -1黑        blackBoard[y][x] = blackcode        child = canvas.create_oval(x * pixsize,                                   y * pixsize,                                   x * pixsize + pixsize,                                   y * pixsize + pixsize, fill='white')    else:        mapRecords2.append(copy.deepcopy(whiteBoard))        stepRecords2.append(step)        scoreRecords2.append(score)        whiteBoard[y][x] = blackcode  # 1白 -1黑        blackBoard[y][x] = whitecode        child = canvas.create_oval(x * pixsize,                                   y * pixsize,                                   x * pixsize + pixsize,                                   y * pixsize + pixsize, fill='black')    IsTurnWhite = not IsTurnWhite    childMap.append(child)    return JudgementResult()# 按钮的点击事件def AutoPlayOnce():    if PlayWithComputer != None:        x, y, score = PlayWithComputer(IsTurnWhite)        chess(x, y, score)btnAuto = tk.Button(top, text="重新开始或者自动走1次", command=AutoPlayOnce)btnAuto.pack()# 画布与鼠标左键进行绑定# canvas.bind("
", playChess)canvas.bind("
", playChess)# 按钮的点击事件def AutoPlayOne(): global isGameOver if PlayWithComputer != None: for i in range(222): if isGameOver: break x, y, score = PlayWithComputer(IsTurnWhite) chess(x, y, score)btnAuto = tk.Button(top, text="自动玩一局", command=AutoPlayOne)btnAuto.pack()canvas.bind("
", playChess)# 显示游戏窗口def ShowWind(): top.mainloop()

喜欢请记得一键三连,之后会更新更有趣的算法,欢迎大家一起交流!

转载地址:https://data-mining.blog.csdn.net/article/details/114787154 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:python_pygame_alpha-beta剪枝算法_玩中国象棋
下一篇:Python_强化学习_Q-Learning算法_二维迷宫游戏

发表评论

最新留言

感谢大佬
[***.8.128.20]2024年04月16日 17时03分10秒