您的位置:首页 → 用飞桨框架2.0造一个会下五子棋的AI模型

Gomoku与围棋、象棋相比更为简略,我们的重点是用AlphaZero在一台电脑上几小时即可培养出一个稳固无比的AI系统,其风险极低,易被轻易击败。

用飞桨框架2.0造一个会下五子棋的AI模型从小白到高手的训练之旅

你也许还记得那种让职业棋手都头疼的“阿尔法狗”,不过这有更强大的“弟弟”AlphaZero-Gomoku-PaddlePaddle,我用了飞桨框架从头学起,挑战了五子棋游戏。

五子棋游戏简介

五子棋是一种两人对弈的纯策略型棋类游戏,通常双方分别使用黑白两色的棋子,轮流在竖线与横线交叉点上摆放自己的棋子。先构成五子连线者获胜。五子棋玩法简单易学,老少皆宜,趣味盎然,引人入胜。

本项目简介

项目概述:AlphaZero在五子棋中的实现本项目是基于AlphaZero算法的实践,使用PaddlePaddle框架进行开发。此项目旨在探索如何通过纯自我博弈的方式训练出一个简单的棋盘游戏Gomoku(也称为五子棋)的AI模型。与围棋或象棋相比,五子棋虽然规则简单但落子空间较小,因此在AlphaZero算法中并未采用大量使用了残差网络的技术。相反地,项目仅采用了卷积层和全连接层作为其神经网络结构。这种简单的网络架构不仅降低了训练的复杂度,还使得本项目的实现更加容易。由于Gomoku游戏对计算机而言非常简单,因此在短时间内可以生成一个足以让你注意到的存在性AI模型。然而,即便如此,在一不留神的情况下,AI也有可能会轻松击败你。# AlphaZero与MuZero的历史渊源AlphaZero是AlphaGo Zero的“前辈”,它的出现标志着机器学习技术的一次重要突破。通过借鉴这些算法的经验和知识,本项目希望深入理解如何利用自我博弈来训练AI模型,并探索其在实际应用中的潜力。自以来,AlphaZero已经在国际象棋、围棋等领域展现了强大的实力。如今,这项技术正逐步应用于更多领域,包括音乐创作、医疗诊断等,展示了人工智能技术的无限可能。# 如何开始训练自己的五子棋AI模型为了能够运行本项目,您需要先安装PaddlePaddle框架。由于项目涉及CPU环境的使用,建议尝试GPU环境以优化性能。在启动程序之前,请确保您的电脑已配置好PaddlePaddle,并且已经安装了相应的库和依赖项。```bash pip install paddlepaddle==0 ```运行训练模型:```bash python train.py ```进行人机对战或AI互搏演示:```bash python human_play.py --grid -r 9 ``` 结论通过对AlphaZero算法的深入理解和应用,本项目旨在提供一个简单且有效的工具,帮助你开始自己的五子棋训练之旅。通过这种方式,不仅能够提高你的AI技能,还可能在日常游戏中增添更多乐趣和挑战。# 您也可以尝试参与在线对战:```bash python human_play.py --grid -r 9 ```# 尝试不同的参数设置,以调整模型训练的性能和效果。

让我们用飞桨框架2.0打造一个会下五子棋的AI模型

首先,让我们开始定义策略价值网络(PolicyValueNet)的结构。这个网络由三个主要的部分组成:公共网络层、行动策略网络层和状态价值网络层。在完成这些定义后,我们继续实现PolicyValueNet类,这是该网络的主要部分之一。该类包含两个关键方法:policy_value_fn和train_step: policy_value_fn: 这个方法主要用于蒙特卡洛树搜索(Monte Carlo Tree Search, MCTS)时评估叶子节点对应的局面评分、局中所有可行动作及其对应概率。 train_step: 这个方法用于更新自我对弈过程中收集的数据上策略价值网络的参数。主要通过训练神经网络来最小化损失,使预测结果与真实情况尽可能接近。在训练阶段,我们使用来自自我对战学习(self-play learning)得到的样本集合(s,π,z)作为模型参数训练的数据集。我们的目标是在每个输入s下,神经网络输出的p(v),和我们在MCTS中获取策略值π(z)之间的差距最小化。损失函数由三个部分组成: 均方误差(Mean Squared Error, MSE):用于评估神经网络预测的胜负结果与真实结果之间的差异。这个损失项帮助我们衡量模型对胜利概率的估计是否准确。 交叉熵损失(Cross-Entropy Loss):用于评估MCTS策略值π(z)和神经网络输出策略p(v)之间的差异。通过这种方式,我们可以监督策略值网络在预测行动选择方面是否有足够的准确性。 L则化项(LRegularization Term):这是一个额外的损失函数,用来防止模型过拟合。它是对网络权重进行加权平均的结果,使得网络整体的性能更好。总体来说,这个过程涉及定义神经网络结构、实现训练算法以及使用特定损失函数来优化网络参数,从而改进策略价值预测的质量和准确性。

%%writefile AlphaZero_Gomoku_PaddlePaddle/policy_value_net_paddlepaddle.pyimport paddleimport numpy as npimport paddle.nn as nn import paddle.nn.functional as Fclass Net(paddle.nn.Layer): def __init__(self,board_width, board_height): super(Net, self).__init__() self.board_width = board_width self.board_height = board_height # 公共网络层 self.conv1 = nn.Conv2D(in_channels=4,out_channels=32,kernel_size=3,padding=1) self.conv2 = nn.Conv2D(in_channels=32,out_channels=64,kernel_size=3,padding=1) self.conv3 = nn.Conv2D(in_channels=64,out_channels=128,kernel_size=3,padding=1) # 行动策略网络层 self.act_conv1 = nn.Conv2D(in_channels=128,out_channels=4,kernel_size=1,padding=0) self.act_fc1 = nn.Linear(4*self.board_width*self.board_height, self.board_width*self.board_height) self.val_conv1 = nn.Conv2D(in_channels=128,out_channels=2,kernel_size=1,padding=0) self.val_fc1 = nn.Linear(2*self.board_width*self.board_height, 64) self.val_fc2 = nn.Linear(64, 1) def forward(self, inputs): # 公共网络层 x = F.relu(self.conv1(inputs)) x = F.relu(self.conv2(x)) x = F.relu(self.conv3(x)) # 行动策略网络层 x_act = F.relu(self.act_conv1(x)) x_act = paddle.reshape( x_act, [-1, 4 * self.board_height * self.board_width]) x_act = F.log_softmax(self.act_fc1(x_act)) # 状态价值网络层 x_val = F.relu(self.val_conv1(x)) x_val = paddle.reshape( x_val, [-1, 2 * self.board_height * self.board_width]) x_val = F.relu(self.val_fc1(x_val)) x_val = F.tanh(self.val_fc2(x_val)) return x_act,x_valclass PolicyValueNet(): """策略&值网络 """ def __init__(self, board_width, board_height, model_file=None, use_gpu=True): self.use_gpu = use_gpu self.board_width = board_width self.board_height = board_height self.l2_const = 1e-3 # coef of l2 penalty self.policy_value_net = Net(self.board_width, self.board_height) self.optimizer = paddle.optimizer.Adam(learning_rate=0.02, parameters=self.policy_value_net.parameters(), weight_decay=self.l2_const) if model_file: net_params = paddle.load(model_file) self.policy_value_net.set_state_dict(net_params) def policy_value(self, state_batch): """ input: a batch of states output: a batch of action probabilities and state values """ # state_batch = paddle.to_tensor(np.ndarray(state_batch)) state_batch = paddle.to_tensor(state_batch) log_act_probs, value = self.policy_value_net(state_batch) act_probs = np.exp(log_act_probs.numpy()) return act_probs, value.numpy() def policy_value_fn(self, board): """ input: board output: a list of (action, probability) tuples for each available action and the score of the board state """ legal_positions = board.availables current_state = np.ascontiguousarray(board.current_state().reshape( -1, 4, self.board_width, self.board_height)).astype("float32") # print(current_state.shape) current_state = paddle.to_tensor(current_state) log_act_probs, value = self.policy_value_net(current_state) act_probs = np.exp(log_act_probs.numpy().flatten()) act_probs = zip(legal_positions, act_probs[legal_positions]) # value = value.numpy() return act_probs, value.numpy() def train_step(self, state_batch, mcts_probs, winner_batch, lr=0.002): """perform a training step""" # wrap in Variable state_batch = paddle.to_tensor(state_batch) mcts_probs = paddle.to_tensor(mcts_probs) winner_batch = paddle.to_tensor(winner_batch) # zero the parameter gradients self.optimizer.clear_gradients() # set learning rate self.optimizer.set_lr(lr) # forward log_act_probs, value = self.policy_value_net(state_batch) # define the loss = (z - v)^2 - pi^T * log(p) + c||theta||^2 # Note: the L2 penalty is incorporated in optimizer value = paddle.reshape(x=value, shape=[-1]) value_loss = F.mse_loss(input=value, label=winner_batch) policy_loss = -paddle.mean(paddle.sum(mcts_probs*log_act_probs, axis=1)) loss = value_loss + policy_loss # backward and optimize loss.backward() self.optimizer.minimize(loss) # calc policy entropy, for monitoring only entropy = -paddle.mean( paddle.sum(paddle.exp(log_act_probs) * log_act_probs, axis=1) ) return loss.numpy(), entropy.numpy()[0] def get_policy_param(self): net_params = self.policy_value_net.state_dict() return net_params def save_model(self, model_file): """ save model params to file """ net_params = self.get_policy_param() # get model params paddle.save(net_params, model_file)登录后复制

Overwriting AlphaZero_Gomoku_PaddlePaddle/policy_value_net_paddlepaddle.py登录后复制

为什么用MCTS"https://img.php.cn/upload/article/001/571/248/175306460218541.jpg" >

第一步是选择(Selection):从根节点开始,每次选择最有可能提供最佳解决方案的子节点。通常采用UCT算法评估分数最高的节点,并直到某个子节点被完全扩展为止。

在扩展阶段(Expansion),找到未扩展的子节点并添加一个新的无记录的历史子节点进行初始化。

第三步是仿真(simulation),从一个未曾尝试过的着法开始,使用简单策略如快速走子策略(Rollout policy)进行到底,得到胜负结果。快速走子策略通常适合选择走子迅速但可能不够精确的策略,因为如果策略执行缓慢,虽然准确度更高,但由于模拟时间增加,实际单位时间内可进行的模拟次数会减少,从而可能削弱棋力。这也是为什么一般只模拟一次的原因,尽管这样更准确,但速度较慢。快速走子策略的优点是能够在较短的时间内迅速做出决定,同时保持一定的决策效率。虽然可能会牺牲一些准确性,但在短时间内仍然能提供有效的评估。

第一步是回溯(backpropagation),将最后得到的胜败结果重新加回到MCTS树结构中。值得注意的是,在更新MCTS树时,除了原有的树结点需要进行修改外,新增的分支也应包含其自身的胜负历史信息。

以上就是MCTS的完整搜索过程。这是四步通常一致的方式,但根据问题和模型的复杂性有所不同。

基于神经网络的蒙特卡洛树搜索(MCTS)

首先进行选择(Selection),在蒙特卡洛树搜索(MCTS)内部,会使用一个叫做UCT(Upper Confidence Bound by Tree Search)的策略来选择子分支。在这个过程中,我们计算每个子分支的Q值和其对应的探索程度(U)。最终,我们会选择Q+U最大值的子分支作为搜索的目标,直到棋局结束或者达到MCTS树的终结节点为止。探索深度由一个系数cpuct决定,这个系数影响了我们在未探索过的情况下的行动偏好。随着树的深度增加,我们倾向于使用Q值来选择动作,因为这样可以提高我们对未来状态价值的估计准确性。

然后是扩展(Expansion) && 仿真(simulation),对于叶子节点状态 s,会利用神经网络对叶子节点进行预测,得到当前叶子节点的各个可能的子节点位置 sL 落子的概率 p 和 对应的价值 v。对于这些可能的新节点,在 MCTS 中创建出来,并初始化其分支上保存的信息为:奖励 R、价值 v 以及概率 P。

最后是回溯(backpropagation), 将新叶子节点分支的信息反向累积到父节点分支上去。这个过程非常简单,从每个叶子节点L开始,逐层向上更新其上层分支的数据结构。

MCTS搜索完毕后,模型就可以在MCTS的根节点s基于以下公式选择行棋的MCTS分支了:

神经网络探索策略调整神经网络中的探索程度可以通过参数 τ 来控制,τ 的值介于 ( 之间。当 τ 越接近于 时,神经网络的采样行为更类似于蒙特卡洛树搜索 (MCTS) 的原始策略,即采用更多的随机性;而当 τ 接近于 时,则更倾向于贪婪策略,即选择访问次数最高的动作。在探索程度较低的情况下(如 τ = ,由于直接计算 N 次 MCTS 的结果可能非常复杂和不精确,为了避免数值问题,通常会在每个动作上加上一个很小的数(本项目中为 -来处理。然后,将这个值转换成自然对数,并乘以 τ/τ 的值,最后应用一个简单的 softmax 函数来调整概率。这样的softmax方法和get_move_probs方法的主要作用是确保在低探索度的情况下,模型的预测仍然具有一定的准确性,同时避免因数值过大的问题导致的严重偏差。因此,在实际应用中,这种优化可以提供更稳定且有效的神经网络探索策略。

def softmax(x): probs = np.exp(x - np.max(x)) probs /= np.sum(probs) return probsdef get_move_probs(self, state, temp=1e-3): """按顺序运行所有播出并返回可用的操作及其相应的概率。 state: 当前游戏的状态 temp: 介于(0,1]之间的临时参数控制探索的概率 """ for n in range(self._n_playout): state_copy = copy.deepcopy(state) self._playout(state_copy) # 根据根节点处的访问计数来计算移动概率 act_visits = [(act, node._n_visits) for act, node in self._root._children.items()] acts, visits = zip(*act_visits) act_probs = softmax(1.0/temp * np.log(np.array(visits) + 1e-10)) return acts, act_probs登录后复制

关键点是什么?在每次模拟中,MCTS(Monte Carlo Tree Search)依赖于神经网络、累计价值(Q)、走法先验概率(P)以及访问对应节点频率这些数字的组合进行探索。它沿着最有希望获胜的路径前进,即具有最高置信区间上界路径的方向。为了评估未曾见过的状态优劣,MCTS会向纵深探索直至遇到从未见过的盘面状态,并使用神经网络进行评估。巧妙地结合MCTS搜索树和神经网络的优势,通过MCTS优化神经网络参数,反过来又指导神经网络进一步训练。这种智能方法使得MCTS能够更加有效地学习和预测复杂的棋局策略,从而在实际比赛中显示出显著的提升。

具体代码可以自行查看项目文件

训练算法流程

AlphaZero采用了一种独特的算法流程:通过自我对弈收集大量数据,并利用这些数据持续优化其策略和价值网络模型。在训练过程中,会将每一次自我对弈的结果作为输入,以调整现有的神经网络结构。当收集到的数据量超过设定的阈值时,AlphaZero便会自动应用策略更新机制,进一步增强其模型能力。这一过程被比喻为一个不断迭代、相互促进的学习循环,最终使AlphaZero能够持续进化和提升其围棋水平。

训练的主文件train.py,可以调整各种超参数

In []

%%writefile AlphaZero_Gomoku_PaddlePaddle/train.py#!/usr/bin/env python# -*- coding: utf-8 -*-# 对于五子棋的AlphaZero的训练的实现from __future__ import print_functionimport randomimport numpy as npimport osfrom collections import defaultdict, dequefrom game import Board, Game_UIfrom mcts_pure import MCTSPlayer as MCTS_Purefrom mcts_alphaGoZero import MCTSPlayerfrom policy_value_net_paddlepaddle import PolicyValueNet # paddlepaddleimport paddleclass TrainPipeline(): def __init__(self, init_model=None, is_shown = 0): # 五子棋逻辑和棋盘UI的参数 self.board_width = 9 为了更快的验证算法,可以调整棋盘大小为(8x8) ,(6x6) self.board_height = 9 self.n_in_row = 5 self.board = Board(width=self.board_width, height=self.board_height, n_in_row=self.n_in_row) self.is_shown = is_shown self.game = Game_UI(self.board, is_shown) # 训练参数 self.learn_rate = 2e-3 self.lr_multiplier = 1.0 # 基于KL自适应地调整学习率 self.temp = 1.0 # 临时变量 self.n_playout = 400 # 每次移动的模拟次数 self.c_puct = 5 self.buffer_size = 10000 #经验池大小 10000 self.batch_size = 512 # 训练的mini-batch大小 512 self.data_buffer = deque(maxlen=self.buffer_size) self.play_batch_size = 1 self.epochs = 5 # 每次更新的train_steps数量 self.kl_targ = 0.02 self.check_freq = 100 #评估模型的频率,可以设置大一些比如500 self.game_batch_num = 1500 self.best_win_ratio = 0.0 # 用于纯粹的mcts的模拟数量,用作评估训练策略的对手 self.pure_mcts_playout_num = 1000 if init_model: # 从初始的策略价值网开始训练 self.policy_value_net = PolicyValueNet(self.board_width, self.board_height, model_file=init_model) else: # 从新的策略价值网络开始训练 self.policy_value_net = PolicyValueNet(self.board_width, self.board_height) # 定义训练机器人 self.mcts_player = MCTSPlayer(self.policy_value_net.policy_value_fn, c_puct=self.c_puct, n_playout=self.n_playout, is_selfplay=1) def get_equi_data(self, play_data): """通过旋转和翻转来增加数据集 play_data: [(state, mcts_prob, winner_z), ..., ...] """ extend_data = [] for state, mcts_porb, winner in play_data: for i in [1, 2, 3, 4]: # 逆时针旋转 equi_state = np.array([np.rot90(s, i) for s in state]) equi_mcts_prob = np.rot90(np.flipud( mcts_porb.reshape(self.board_height, self.board_width)), i) extend_data.append((equi_state, np.flipud(equi_mcts_prob).flatten(), winner)) # 水平翻转 equi_state = np.array([np.fliplr(s) for s in equi_state]) equi_mcts_prob = np.fliplr(equi_mcts_prob) extend_data.append((equi_state, np.flipud(equi_mcts_prob).flatten(), winner)) return extend_data def collect_selfplay_data(self, n_games=1): """收集自我博弈数据进行训练""" for i in range(n_games): winner, play_data = self.game.start_self_play(self.mcts_player, temp=self.temp) play_data = list(play_data)[:] self.episode_len = len(play_data) # 增加数据 play_data = self.get_equi_data(play_data) self.data_buffer.extend(play_data) def policy_update(self): """更新策略价值网络""" mini_batch = random.sample(self.data_buffer, self.batch_size) state_batch = [data[0] for data in mini_batch] # print(np.array( state_batch).shape ) state_batch= np.array( state_batch).astype("float32") mcts_probs_batch = [data[1] for data in mini_batch] mcts_probs_batch= np.array( mcts_probs_batch).astype("float32") winner_batch = [data[2] for data in mini_batch] winner_batch= np.array( winner_batch).astype("float32") old_probs, old_v = self.policy_value_net.policy_value(state_batch) for i in range(self.epochs): loss, entropy = self.policy_value_net.train_step( state_batch, mcts_probs_batch, winner_batch, self.learn_rate * self.lr_multiplier) new_probs, new_v = self.policy_value_net.policy_value(state_batch) kl = np.mean(np.sum(old_probs * ( np.log(old_probs + 1e-10) - np.log(new_probs + 1e-10)), axis=1) ) if kl > self.kl_targ * 4: # early stopping if D_KL diverges badly break # 自适应调节学习率 if kl > self.kl_targ * 2 and self.lr_multiplier > 0.1: self.lr_multiplier /= 1.5 elif kl < self.kl_targ / 2 and self.lr_multiplier < 10: self.lr_multiplier *= 1.5 explained_var_old = (1 - np.var(np.array(winner_batch) - old_v.flatten()) / np.var(np.array(winner_batch))) explained_var_new = (1 - np.var(np.array(winner_batch) - new_v.flatten()) / np.var(np.array(winner_batch))) print(("kl:{:.5f}," "lr_multiplier:{:.3f}," "loss:{}," "entropy:{}," "explained_var_old:{:.3f}," "explained_var_new:{:.3f}" ).format(kl, self.lr_multiplier, loss, entropy, explained_var_old, explained_var_new)) return loss, entropy def policy_evaluate(self, n_games=10): """ 通过与纯的MCTS算法对抗来评估训练的策略 注意:这仅用于监控训练进度 """ current_mcts_player = MCTSPlayer(self.policy_value_net.policy_value_fn, c_puct=self.c_puct, n_playout=self.n_playout) pure_mcts_player = MCTS_Pure(c_puct=5, n_playout=self.pure_mcts_playout_num) win_cnt = defaultdict(int) for i in range(n_games): winner = self.game.start_play(current_mcts_player, pure_mcts_player, start_player=i % 2) win_cnt[winner] += 1 win_ratio = 1.0 * (win_cnt[1] + 0.5 * win_cnt[-1]) / n_games print("num_playouts:{}, win: {}, lose: {}, tie:{}".format( self.pure_mcts_playout_num, win_cnt[1], win_cnt[2], win_cnt[-1])) return win_ratio def run(self): """开始训练""" root = os.getcwd() dst_path = os.path.join(root, 'dist') if not os.path.exists(dst_path): os.makedirs(dst_path) try: for i in range(self.game_batch_num): self.collect_selfplay_data(self.play_batch_size) print("batch i:{}, episode_len:{}".format( i + 1, self.episode_len)) if len(self.data_buffer) > self.batch_size: loss, entropy = self.policy_update() print("loss :{}, entropy:{}".format(loss, entropy)) if (i + 1) % 50 == 0: self.policy_value_net.save_model(os.path.join(dst_path, 'current_policy_step.model')) # 检查当前模型的性能,保存模型的参数 if (i + 1) % self.check_freq == 0: print("current self-play batch: {}".format(i + 1)) win_ratio = self.policy_evaluate() self.policy_value_net.save_model(os.path.join(dst_path, 'current_policy.model')) if win_ratio > self.best_win_ratio: print("New best policy!!!!!!!!") self.best_win_ratio = win_ratio # 更新最好的策略 self.policy_value_net.save_model(os.path.join(dst_path, 'best_policy.model')) if (self.best_win_ratio == 1.0 and self.pure_mcts_playout_num < 8000): self.pure_mcts_playout_num += 1000 self.best_win_ratio = 0.0 except KeyboardInterrupt: print('\n\rquit')if __name__ == '__main__': device = paddle.get_device() paddle.set_device(device) is_shown = 0 # model_path = 'dist/best_policy.model' model_path = 'dist/current_policy.model' training_pipeline = TrainPipeline(model_path, is_shown) # training_pipeline = TrainPipeline(None, is_shown) training_pipeline.run()登录后复制

Overwriting AlphaZero_Gomoku_PaddlePaddle/train.py登录后复制

开始训练和评估:

In []

!pip install pygame %cd AlphaZero_Gomoku_PaddlePaddle !python train.py登录后复制

经过每固定步数的训练,系统会自动与自己对弈,并对结果进行评估和保存。这样可以保持之前的训练状态,以便继续进步。

最后再介绍下MuZero

MuZero是AlphaZero的后续产品。与AlphaGo、AlphaZero相似,它采用了MCTS与神经网络相结合的方法,预测并选择当前环境下的最佳行动。然而,MuZero无需提供规则手册,只需通过自我试验即可学会象棋、围棋及其他多个Atari游戏。此外,它能综合考虑游戏环境的所有方面来评估局面的优劣,并且还能从自身错误中学习,从而改进策略。

以上就是用飞桨框架2.0造一个会下五子棋的AI模型的详细内容,更多请关注其它相关文章!

  • 标签: