深層強化学習(Deep Q Network, DQN)の簡単な例 〜Experience Replay追加〜
はじめに
前回の記事でOpenAI Gymを使わず非常に簡単な問題を対象にDQNを適用してみたが、"Experience Replay"を入れていなかった。今回は前回の問題にExperience Replayを追加してみる。なおこれを実施するにあたり、下記サイトを参考にした。
第15回 CartPole課題で深層強化学習DQNを実装|Tech Book Zone Manatee
Reinforcement Learning (DQN) Tutorial — PyTorch Tutorials 1.5.1 documentation
Experience Replay
行動とその結果を経験として記録しておき、その過去の経験をサンプリングして学習データとする方法。通常の逐次で学習する方法に比べ、時間的に離れたデータを使用することができ、偏りを抑えた安定した学習ができるとのこと。複数データをまとめてニューラルネットワークにミニバッチ学習させることもできる。
具体的には「現在の状態」、「行動」、「行動後の状態」、「報酬」のデータの組をある一定の数だけメモリに残しておいて、その中から学習に使うデータをいくつかランダムにサンプリングし、これらを1つのバッチとしてニューラルネットワークに学習させる。
対象とする問題
前回記事と同一の報酬払出装置を考える。プログラムは前回のものにいくつか追加・修正してExperience Replayを適用する。
深層強化学習(Deep Q Network, DQN)の簡単な例 - 化学系エンジニアがAIを学ぶ
Experience Replayの実装
下記のpytorch.orgのチュートリアルにあるように、namedtupleを用いて経験を記録するクラスを追加する。
Reinforcement Learning (DQN) Tutorial — PyTorch Tutorials 1.5.1 documentation
ここでは500回分の行動の結果を記録することにしている(数は適当)。500を超えると古い方から順番に上書きされる形となっている。サンプリング時は指定したバッチサイズ数だけのデータが取り出される。
from collections import namedtuple # 「現在の状態」、「行動」、「行動後の状態」、「報酬」を記録するnamedtuple Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward')) import random class ReplayMemory(object): def __init__(self, capacity): self.capacity = capacity # メモリの許容サイズ self.memory = [] # 経験を保存するリスト self.index = 0 # 保存した経験リストのindex def push(self, state, action, next_state, reward): # 経験をリストに追加(push)する if len(self.memory) < self.capacity: # 許容サイズ以下なら追加 self.memory.append(None) self.memory[self.index] = Transition(state, action, next_state, reward) self.index = (self.index + 1) % self.capacity # indexを1つ進める。許容サイズ超えたら古い順から上書き def sample(self, batch_size): # バッチサイズ分の経験のランダムサンプリング return random.sample(self.memory, batch_size) def __len__(self): return len(self.memory) CAPACITY = 500 memory = ReplayMemory(CAPACITY)
DQNの学習
上記の行動を記録したクラスを利用できる形に、DQNの学習部分を修正する。
BATCH_SIZE = 50 # サンプリングするデータ数 def update_dqn(replay_memory): # 行動を記録したクラスを引数とする ## メモリがバッチサイズより小さいときはまだ学習しない if len(replay_memory) < BATCH_SIZE: return ## バッチ取得 transitions = replay_memory.sample(BATCH_SIZE) ## (状態、行動、次の状態、報酬)✕バッチサイズ を (状態xバッチサイズ、行動✕バッチサイズ、、、)に変える batch = Transition(*zip(*transitions)) ## 各値をtensorに変換 state = torch.FloatTensor(batch.state).unsqueeze(1) action = torch.LongTensor(batch.action).unsqueeze(1) next_state = torch.FloatTensor(batch.next_state).unsqueeze(1) reward = torch.FloatTensor(batch.reward).unsqueeze(1) ## Q値の算出 q_now = dqn(state).gather(-1, action) # 今の状態のQ値 max_q_next = dqn(next_state).max(-1)[0].unsqueeze(1).detach() # 状態移行後の最大のQ値 gamma = 0.9 q_target = reward + gamma * max_q_next # 目標のQ値 # DQNパラメータ更新 optimizer.zero_grad() loss = criterion(q_now, q_target) # 今のQ値と目標のQ値で誤差を取る loss.backward() optimizer.step() return loss.item() # lossの確認のために返す
学習結果
他のコードは前回記事と同じである。下図に学習の推移を示す。左は誤差の推移、右は1エピソードにおけるトータル報酬の推移である。実際に計算してみるとわかるが逐次で学習させるより、Experience Replayを適用するほうが学習が安定しやすい。
コード
参考
・第15回 CartPole課題で深層強化学習DQNを実装|Tech Book Zone Manatee
・Reinforcement Learning (DQN) Tutorial — PyTorch Tutorials 1.5.1 documentation
深層強化学習(Deep Q Network, DQN)の簡単な例
はじめに
DQNを学ぼうとして色々と調べたが、どこもかしこもOpenAI Gymを使っていて、まずそれの扱いから考えないといけないのでつらい。ここではOpenAI Gymを使わず、非常に簡単な問題を対象にDQNを適用してみることとする。Python、PyTorchを用いる。
また、DQNをやろうとすると出てくる"Experience Replay"。これも入れようとすると考えるのがつらいので、入れない。
下記記事にDQN全般やExperience Replayについてちょっと解説がある。
Deep Q Network (DQN) - DeepLearningを勉強する人
対象とする問題
下図のような報酬払出装置を考える。装置には「電源」ボタンと「払出」ボタンが設置されている。「電源」ボタンを押すと装置電源のON/OFFが切り替わる。電源ONのときに「払出」ボタンを押すと報酬が払い出される。電源OFFのときに「払出」ボタンを押しても報酬は払い出されない。
1回の行動で、いずれかのボタンを1度押すことができるものとする。5回の行動を行う場合に、報酬を最大化する手順を学習することができるか?
なお、この問題の内容に関して下記書籍を参考にした。
この払出装置(Dispenser)をプログラムにすると次の通りとなる。
class Dispenser(object): def __init__(self, init_state): """ 初期のON/OFF状態を設定する init_state: 0->電源OFF、1->電源ON """ self.state = init_state def powerbutton(self): """ 電源ボタンを押し、ON/OFFを切り替える """ if self.state == 0: self.state = 1 else: self.state = 0 def step(self, action): """ 払出機を操作する action: 0->電源ボタンを押す 1->払出ボタンを押す 状態と報酬が返る """ if action == 0: # 電源ボタンを押した場合 self.powerbutton() # 電源ON/OFF切り替え reward = 0 # 報酬はない else: # 払出ボタンを押した場合 if self.state == 1: reward = 1 # 電源ONのときのみ報酬あり else: reward = 0 return self.state, reward
Deep Q Network (DQN)
報酬払出機の状態は2つ、取れる行動は2つであるので、これを表すQテーブルは次のようになる。
Q学習ではこのQテーブルのQ値(Q(0,0), Q(0,1), Q(1,0), Q(1,1))を逐次更新していくが、DQNではこれらのQ値を得る関数を逐次更新していく。その関数を表すのにニューラルネットワークを用いるが、次のようなイメージとなる。
状態を入力として、Q値を出力として得る形である。ここでは上図の通りのニューラルネットワークを用いることとする。(あまりディープではないが・・・)
import torch import torch.nn as nn import torch.nn.functional as F class DQN(nn.Module): def __init__(self): super(DQN, self).__init__() self.l1 = nn.Linear(1, 3) self.l2 = nn.Linear(3, 3) self.l3 = nn.Linear(3, 2) def forward(self, x): x = F.relu(self.l1(x)) x = F.relu(self.l2(x)) x = self.l3(x) return x dqn = DQN() optimizer = torch.optim.SGD(dqn.parameters(), lr=0.01) criterion = nn.MSELoss()
DQNの学習方法
では、どのような情報を使ってこのニューラルネットワークを学習させていくのか、ということになるが、これにはQ学習のQ値の更新式を用いる。
この式は今のQ値()を、今の推定の目標値()に近づくよう更新するものである。すなわち、
となることを目指している。よってとの誤差を小さくするようにニューラルネットワークを学習させるという形にすればよいことになる。いずれの値も現在の状態、行動、次の状態、報酬が分かれば算出可能であり、DQNの学習をプログラムにすると次のようになる。
def update_dqn(state, action, next_state, reward): ## 各変数をtensorに変換 state = torch.FloatTensor([state]) action = torch.LongTensor([action]) # indexとして使うのでLong型 next_state = torch.FloatTensor([next_state]) ## Q値の算出 q_now = dqn(state).gather(-1, action) # 今の状態のQ値 max_q_next = dqn(next_state).max(-1)[0].detach() # 状態移行後の最大のQ値 gamma = 0.9 q_target = reward + gamma * max_q_next # 目標のQ値 # DQNパラメータ更新 optimizer.zero_grad() loss = criterion(q_now, q_target) # 今のQ値と目標のQ値で誤差を取る loss.backward() optimizer.step() return loss.item() # lossの確認のために返す
DQNからの行動の決定
DQNは入力された状態に対し、取りうるすべての行動のQ値のtensorを返す(1次元tensor)。そこから最大のQ値のindexを取り出して(0または1)、それをそのまま行動としている。
import numpy as np EPS_START = 0.9 EPS_END = 0.0 EPS_DECAY = 200 def decide_action(state, episode): state = torch.FloatTensor([state]) # 状態を1次元tensorに変換 ## ε-グリーディー法 eps = EPS_END + (EPS_START - EPS_END) * np.exp(-episode / EPS_DECAY) if eps <= np.random.uniform(0, 1): with torch.no_grad(): action = dqn(state).max(-1)[1] # Q値が最大のindexが得られる action = action.item() # 0次元tensorを通常の数値に変換 else: num_actions = len(dqn(state)) # action数取得 action = np.random.choice(np.arange(num_actions)) # ランダム行動 return action
学習する
現在の状態からDQNより行動を決めて、環境から次の状態・報酬を取得し、これらの結果を用いてDQNを更新する、を繰り返す。エピソードの数や上記のoptimizerの学習率、ε-グリーディー法のepsの変化のさせ方はうまく学習が進むように調節して決めている。(適度な値を決めるのにかなり苦労した。)
import matplotlib.pyplot as plt NUM_EPISODES = 1200 NUM_STEPS = 5 log = [] # 結果のプロット用 for episode in range(NUM_EPISODES): env = Dispenser(0) total_reward = 0 # 1エピソードでの報酬の合計を保持する for s in range(NUM_STEPS): ## 現在の状態を確認 state = env.state ## 行動を決める action = decide_action(state, episode) ## 決めた行動に従いステップを進める。次の状態、報酬を得る next_state, reward = env.step(action) ## DQNを更新 loss = update_dqn(state, action, next_state, reward) total_reward += reward log.append([total_reward, loss]) ## 結果表示 r, l = np.array(log).T fig = plt.figure(figsize=(11,4)) ax1 = fig.add_subplot(121) ax2 = fig.add_subplot(122) ax1.set_xlabel("Episode") ax1.set_ylabel("Loss") ax1.set_yscale("log") ax2.set_xlabel("Episode") ax2.set_ylabel("Total reward") ax1.plot(l) ax2.plot(r) plt.show()
学習結果
下図に学習の推移を示す。左は誤差の推移、右は1エピソードにおけるトータル報酬の推移である。払出機は初め電源OFFのため、5回の行動で得られる最大のトータル報酬は4であるが、右下のグラフから最終的にトータル報酬4が得られる行動を適切に学習できていることを確認できる。
学習後のDQNから各状態におけるQ値を確認できるが、今回の学習での値は下記である。
>>> dqn(torch.FloatTensor([0])) tensor([9.0000, 8.1200], grad_fn=<AddBackward0>) >>> dqn(torch.FloatTensor([1])) tensor([8.1060, 10.0000], grad_fn=<AddBackward0>)
計算の詳細は示さないが、今回と同一の問題にQ学習を適用するとこれらとほぼ同じQ値が得られ、DQNによってQ値を適切に表現する関数を構築できていることを確認できる。
さいごに
DQNを適用するのが全く意味がないような簡単な問題にDQNを適用した。"Experience Replay"も考えなかった(状態✕行動の組み合わせが4パターンしかない問題なのであまり効果はないのかも)。簡単すぎる問題であるがDQNを適用する上での基礎的なポイントは掴めたのではないかと思う。
コード
ここで示した全コードおよびQ学習を適用した場合のコードはこちら:
参考
・Reinforcement Learning (DQN) Tutorial — PyTorch Tutorials 1.1.0 documentation
反応器計算におけるQ学習2
はじめに
以前の記事で管型反応器において所望の反応率を得るという問題について強化学習(Q学習)を適用したが、 今回は反応率に応じて目的成分の選択率が変化するという条件で、収率を最大化する問題を対象とする。
管型反応器の例題
対象とする問題は下記の通り。
- 成分Aを反応して目的の生成物Bを得る管型反応器がある
- 反応器は連続運転しており、1日に1回だけAの反応率とBの収率を確認でき、その際に反応温度を変更することができる
- 反応温度は1回につき0.5℃変更できる(上げることも、下げることも、変えないことも可能)
- 反応温度を変更することで反応率を変化させることができる
- 生成物Bと一緒に副生物Cも生成し、反応率に応じて生成物Bの選択率は変化する
ある初期温度で運転を開始したのちに、収率を最大化することを学習できるか?
ここで反応温度・反応率・選択率・収率の関係は図中の式で表されるものとし、 本例題ではk=0.01、a=100とする。 反応温度と反応率、反応率と選択率の関係は下記グラフとなる。
なお、収率が最大となる条件はこの時点で計算可能であり、反応率0.955、収率0.945である。 これを強化学習で推算できるかがポイントとなる。
管型反応器のモデル
反応器の状態と行動を次の表の通り整理する。 状態は反応率を離散化して表現する。ここでは反応率0.9未満を状態0とし、0.9から1.0を100分割して番号を振った。 (収率が最大となる反応率が少なくとも0.9以上とわかっている前提)
報酬はとし、1に満たない分をマイナスの報酬(ペナルティ)として与える形とした。
import numpy as np import matplotlib.pyplot as plt class Tubular_selec(object): def __init__(self, temp): self._k = 0.01 # kを0.01とする self._a = 100.0 # aを100とする init_conv = self.calc_conv(self._k, temp) init_yield = self.calc_yield(self._a, init_conv) ## ログとして[温度, 反応率, 収率]を残す self._log = [[temp, init_conv, init_yield]] def calc_conv(self, k, temp): ## 反応率計算 return 1 - np.exp(-k * temp) def calc_selc(self, a, conv): ## 選択率計算 return 1 - conv**a def calc_yield(self, a, conv): ## 収率計算 return conv * self.calc_selc(a, conv) def state(self): ## 状態を返す 状態を転化率0.9-1.0の間で100分割する ## 0.9未満を状態0とするので、状態は全部で101個 _, conv, _ = self._log[-1] # 一番最新のlogからデータ取得 n = 100 if conv < 0.9: # 反応率 0.1未満は状態0とする st = 0 else: st = int(np.ceil((conv - 0.9)*10*n)) # 0.9~1.0をn分割 return st def step(self, action): temp, _, _ = self._log[-1] # 最新の温度を取得 ## 行動0で温度UP、行動2で温度Down、行動1は何もしない if action == 0: temp += 0.5 elif action == 2: temp -= 0.5 conv = self.calc_conv(self._k, temp) yild = self.calc_yield(self._a, conv) self._log.append([temp, conv, yild]) # 1.0に満たない分を負の報酬(ペナルティ)として与える reward = - abs(1.0 - yild) return reward
Q値に従った次の行動の決定
def decide_action(qtable, state, episode): eps = 1 - episode/240 # ε-グリーディー法 if eps <= np.random.uniform(0, 1): # グリーディー行動 t = np.where(qtable[state]==qtable[state].max())[0] # Q値が最大のインデックスを返す else: # ランダム行動 t = np.arange(qtable.shape[-1]) # 取りうる行動すべて return np.random.choice(t) # 行動の候補からランダムに選ぶ
Q値の更新
def update_qtable(qtable, state, action, reward, next_state): gamma = 0.9 alpha = 0.5 next_qmax = max(qtable[next_state]) qtable[state, action] = (1 - alpha) * qtable[state, action] + alpha * ( reward + gamma * next_qmax) return qtable
学習する
1episodeあたり100日運転することとして、300episode学習させる。
OPERATION_PERIOD = 100 # 100日分 EPISODE_NUM = 300 qtable = np.zeros((101, 3)) # サイズは状態数✕行動数 ## 学習状況確認用グラフ設定 fig = plt.figure(figsize=(11,4)) ax1 = fig.add_subplot(121) ax2 = fig.add_subplot(122) ## 学習 for episode in range(EPISODE_NUM): #t = np.random.uniform(280, 350) # 初期温度をランダムにする場合 env = Tubular_selec(280) for d in range(OPERATION_PERIOD): ## まず現在の状態を確認 state = env.state() ## 次に行動を決める action = decide_action(qtable, state, episode) ## 決めた行動に従い運転を1日進める.また報酬を環境から得る reward = env.step(action) ## 新しい状態を確認 next_state = env.state() ## Qテーブル更新 qtable = update_qtable(qtable, state, action, reward, next_state) ## 10episodeにつき1回のグラフ描画 if (episode+1)%10 == 0: t, c, y = np.array(env._log).T ax1.cla() ax2.cla() ax1.set_xlabel("Days") ax1.set_ylabel("Temperature") ax1.set_ylim(270,320) ax2.set_xlabel("Days") ax2.set_ylabel("Conversion, Yield") ax2.set_ylim(0.93,0.96) ax1.plot(t) ax2.plot(c, label="Conversion") ax2.plot(y, label="Yield") ax2.legend() plt.pause(0.1)
学習結果
収率最大となるのは反応率0.955、収率0.945のときであるが、 反応温度を上げて、まっすぐその状態に近づいていることが確認できる。
反応器計算におけるQ学習
はじめに
簡単な例として、管型反応器において反応温度を調節して所望の反応率を得るという問題について強化学習(Q学習)を適用してみる。
管型反応器の例題
対象とする問題は下記の通り。
- 成分Aを反応して生成物Bを得る管型反応器がある
- 反応器は連続運転しており、1日に1回だけ反応率を確認でき、その際に反応温度を変更することができる
- 反応温度は1回につき0.5℃変更できる(上げることも、下げることも、変えないことも可能)
- 反応温度を変更することで反応率を変化させることができる
ある初期温度で運転を開始したのちに所望の反応率に調整することを学習できるか。
ここで反応率と反応温度の関係は図中の式で表されるものとし、本例題ではk=0.01とする。 反応率と温度は下記グラフの関係となる。
管型反応器のモデル
反応器の状態と行動を次の表の通り整理する。ここでは反応率の目標を0.9599〜0.96としている。 状態は目標より低い、同じ、高い、の3つとし、行動は温度を上げる、何もしない、温度を下げる、の3つとする。 反応率が目標と同じで、かつ何もしないときに報酬が得られるものとする。
import numpy as np import matplotlib.pyplot as plt class Tubular(object): def __init__(self, temp): self._k = 0.01 # kを0.01とする ## ログとして[反応温度, 反応率]を残す self._log = [[temp, self.calc_conv(self._k, temp)]] def calc_conv(self, k, temp): ## 反応率を計算 return 1 - np.exp(-k * temp) def state(self): ## 状態を返す 反応率が目標内なら1、それより下は0、上は2 _, conv = self._log[-1] # 一番最新のlogからデータ取得 if conv < 0.9599: st = 0 elif conv > 0.9600: st = 2 else: st = 1 return st def step(self, action): ## 計算を1日分進めるステップ計算 ## まず報酬計算 状態1で行動1(何もしない)のとき報酬あり if self.state()==1 and action==1: reward = 1 else: reward = 0 ## 1日分進める temp, _ = self._log[-1] # 一番最新のlogからデータ取得 if action == 0: temp += 0.5 elif action == 2: temp -= 0.5 conv = self.calc_conv(self._k, temp) self._log.append([temp, conv]) return reward
Q値に従った次の行動の決定
def decide_action(qtable, state, episode): eps = 0.8 - 0.8*episode/800 # ε-グリーディー法 if eps <= np.random.uniform(0, 1): ## Q値が最大のインデックスを返す t = np.where(qtable[state]==qtable[state].max())[0] else: ## 取りうる行動すべてを返す t = np.arange(qtable.shape[-1]) return np.random.choice(t) # 行動の候補からランダムに選ぶ
Q値の更新
def update_qtable(qtable, state, action, reward, next_state): gamma = 0.9 alpha = 0.5 next_qmax = max(qtable[next_state]) qtable[state, action] = (1 - alpha) * qtable[state, action] + alpha * ( reward + gamma * next_qmax) return qtable
学習する
1episodeあたり100日運転することとして、1000episode学習させる。 各episodeの初期の反応温度はランダムに決める。
operation_period = 100 # 100日分 episode_num = 1000 qtable = np.zeros((3, 3)) # サイズは状態数✕行動数 for episode in range(episode_num): t = np.random.uniform(300, 350) env = Tubular(t) for d in range(operation_period): ## まず現在の状態を確認 state = env.state() ## 次に行動を決める action = decide_action(qtable, state, episode) ## 決めた行動に従い運転を1日進める.また報酬を環境から得る reward = env.step(action) ## 新しい状態を確認 next_state = env.state() ## Qテーブル更新 qtable = update_qtable(qtable, state, action, reward, next_state)
学習結果
Qテーブルの一例。反応率が目標未満なら温度を上げ、目標より高いなら温度を下げるような値になっている。
# 温度上げる 何もしない 温度下げる array([[1.97888446e-02, 3.12142611e-12, 1.27102555e-18], # 反応率目標未満 [2.34887193e+00, 1.00000000e+01, 5.69672696e+00], # 目標内 [7.11430684e-27, 7.22125479e-27, 1.91366261e-02]]). # 目標より高い
次の結果確認用コードを用いて各種初期温度を計算すると、いずれも最終的に所望の反応率に到達していることを確認できる。
def suisan(init_temp): env = Tubular(init_temp) for d in range(operation_period): state = env.state() action = decide_action(qtable, state, 10000) # episode数は適当に大きい値を入れる。探索をしないため。 _ = env.step(action) t, c = np.array(env._log).T fig = plt.figure(figsize=(11,4)) ax1 = fig.add_subplot(121) ax2 = fig.add_subplot(122) ax1.set_xlabel("Days") ax2.set_xlabel("Days") ax1.set_ylabel("Temperature") ax2.set_ylabel("Conversion") ax1.plot(t) ax2.plot(c) plt.show()
初期温度 310
初期温度 340
メモ: PyTorch tensor requires_gradのTrue/False確認、切り替え
requires_gradのTrue/False確認
属性 requires_gradを参照して確認できる。
import torch a = torch.tensor([0, 1], dtype=torch.float32) b = torch.tensor([2, 3], dtype=torch.float32, requires_grad=True) a.requires_grad b.requires_grad
実行結果
False True
requires_gradのTrue/False切り替え
メソッド requires_grad()で切り替えられる
a.requires_grad_(True)
a.requires_grad
True # <- FalseからTrueに変わった
メモ: PyTorchの自動微分について
はじめに
PyTorchの自動微分についてのメモを記す。
自動微分
アルゴリズムによって定義された関数からその関数の偏導関数値を計算するアルゴリズムを導出するための技術。 数値微分や数式微分とも異なる方法で、これら方法の問題点(誤差や桁落ちによる精度低下、プログラムから微分表現を導けない)を 解消できる手段とのこと。
PyTorchでの自動微分の例
- tensorを作るときに、
requires_grad=True
とする。 - テンソル演算する。 (ex. y = x **2)
y.backward()
で勾配を計算する。x.grad
で勾配値が得られる
import torch x = torch.tensor(1.0, requires_grad=True) # require_grad=Trueが必要。defaultはFalse y = x ** 2 y.backward() # 勾配を求める print(x.grad) # xにおける勾配の値がx.gradに入っている x = torch.tensor(1.0, requires_grad=True) y = 2 ** x y.backward() print(x.grad) x1 = torch.tensor(1.0, requires_grad=True) x2 = torch.tensor(1.0, requires_grad=True) y = x1**2 + torch.sqrt(x1*x2) # 偏微分のケース y.backward() print(x1.grad, x2.grad)
実行結果
tensor(2.) tensor(1.3863) tensor(2.5000) tensor(0.5000)
参考
スキナー箱のQ学習
はじめに
ネズミがボタンを押して餌を得るという問題について強化学習(Q学習)を適用してみる。 本記事の内容に関して下記書籍を参考にした。
ネズミの学習問題
対象とする問題は下記の通り。
- 箱の中にネズミが1匹いる
- 箱の中にネズミ餌のディスペンサーが1つある
- ディスペンサーには「電源ボタン」と「餌ボタン」がある
- 「電源ボタン」をネズミが押すたびに電源のON/OFFが切り替わる
- ディスペンサーの電源がONのときにネズミが「餌ボタン」を押すと餌が出てくる
ネズミは餌を得る手順を学習できるか。
ディスペンサーのモデル
まず、電源のON/OFF状態を保持し、ON状態で「餌ボタン」が押されると餌(報酬)を返すクラスを作成する。
import numpy class Dispenser(object): def __init__(self, init_state): """ 初期のON/OFF状態を設定する init_state: 0->電源OFF、1->電源ON """ self._state = init_state self._button = self.powerbutton(init_state) def powerbutton(self, init_state): """ ON/OFFを切り替えるgenerator 0: 電源OFF、1: 電源ON を返す。内部的には-1, 1で切り替えている """ state = -1 if init_state==0 else init_state while True: state = -1 * state yield numpy.maximum(0, state) def push_powerbutton(self): self._state = self._button.__next__() def step(self, action): """ action: 0->電源ボタンを押す 1->餌ボタンを押す """ if action == 0: self.push_powerbutton() reward = 0 else: reward = 1 if self._state==1 else 0 return self._state, reward
挙動としては下記。電源ボタンを押すとON/OFFが切り替わり、ONのとき餌ボタンを押すと餌(報酬 1)が得られる。 具体的には(状態, 報酬)のタプルが返る。これだけ。
>>> env = Dispenser(0) # 電源OFFを初期状態としてディスペンサーオブジェクトを作成 >>> env.step(0) # 0:電源ボタンを押す (1, 0) # 状態:電源ON, 報酬:0 が返る >>> env.step(1) # 1: 餌ボタンを押す (1, 1) # 状態:電源ON, 報酬:1が返る
Q値
各状態および行動に対するQ値の表を次の通り決める。
Q(0, 0): 状態 電源OFF、行動 電源ボタンを押す
Q(0, 1): 状態 電源OFF、行動 餌ボタンを押す
Q(1, 0): 状態 電源ON、 行動 電源ボタンを押す
Q(1, 1): 状態 電源ON、 行動 餌ボタンを押す
各Q値は下記の通り配列で保持することとする。初期値は全て0。
qtable = numpy.zeros((2, 2))
Q値に従って次の行動を決める
Q値および現在の状態から次に取る行動を決める。基本的にはQ値が最大の行動を取るが、 時折ランダムな行動を取る - グリーディー法を取り入れることとする。
def get_action(qtable, state, episode): """ 現在の状態とQ値から次の行動を決める episode回数が増えるごとにランダム行動を取らなくする """ ## ε- greedy法 行動を重ねる(episodeが増える)毎にランダム行動を取らなくする epsilon = 0.5 * (1 / (episode + 1)) if epsilon <= numpy.random.uniform(0, 1): ## まずQ値が最大の要素を確認(必ずしも1つでないので) ## 最大値がTrue、それ以外はFalseとなっているリストを得る qmax_bool_list = qtable[state]==qtable[state].max() ## Qが最大値の要素のインデックスを得る(複数あるときはその中からランダムに選ぶことになる) choices = numpy.where(qmax_bool_list)[0] else: ## ランダム行動。全要素のインデックスを得る choices = numpy.arange(len(qtable[state])) return numpy.random.choice(choices) # 候補の中から1つランダムに選ぶ
Q値を更新する
状態、行動および行動を取った結果の報酬から下記式に従いQ値を更新する。 は状態、は行動、は報酬、は次の状態での最大のQ値。 は学習率、は割引率と呼ばれる。
def update_qtable(qtable, state, action, next_state, reward): gamma = 0.9 alpha = 0.5 next_qmax = max(qtable[next_state]) qtable[state, action] = (1 - alpha) * qtable[state, action] +\ alpha * (reward + gamma * next_qmax) return qtable
学習する
1episodeあたり5回行動することとして、10episode学習させる。 各episodeの初期のディスペンサー電源ON/OFFはランダムに決める。
MAX_STEP_NUMBER = 5 # 1episodeあたり5回行動することとする MAX_EPISODE_NUMBER = 10 # episodeは10までとする for episode in range(MAX_EPISODE_NUMBER): total_reward = 0 state = numpy.random.choice([0, 1]) # ディスペンサー電源初期状態はランダム env = Dispenser(state) for step in range(MAX_STEP_NUMBER): action = get_action(qtable, state, episode) # q値、状態から行動を決める next_state, reward = env.step(action) # 行動を取った結果を得る print(state, action, reward) total_reward += reward qtable = update_qtable(qtable, state, action, next_state, reward) # Q値の更新 state = next_state print('episode : %d total reward %d' %(episode+1, total_reward)) print(qtable)
学習結果
初期が電源OFFならtotal rewardは最大4、ONなら最大5となるが、episode 9, 10ではいずれもrewardは最大となっている。 初期ON/OFFがランダムでも正しく行動が取られている。
>>> 0 1 0 0 0 0 1 1 1 1 1 1 1 1 1 episode : 1 total reward 3 [[ 0. 0. ] [ 0. 1.42625]] 0 0 0 1 0 0 0 0 0 1 1 1 1 0 0 episode : 2 total reward 1 [[ 0.96271875 0. ] [ 0.57763125 1.8549375 ]] . . . 0 0 0 1 1 1 1 1 1 1 1 1 1 1 1 episode : 9 total reward 4 [[ 6.00931629 1.23142328] [ 1.25417229 7.85361236]] 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 episode : 10 total reward 5 [[ 6.00931629 1.23142328] # 電源OFF時のQ値、電源ボタンを押すQ値が大きい [ 1.25417229 8.33916616]] # 電源ON時のQ値、餌ボタンを押すQ値が大きい
全コード
import numpy class Dispenser(object): def __init__(self, init_state): """ 初期のON/OFF状態を設定する init_state: 0->電源OFF、1->電源ON """ self._state = init_state self._button = self.powerbutton(init_state) def powerbutton(self, init_state): """ ON/OFFを切り替えるgenerator 0: 電源OFF、1: 電源ON を返す。内部的には-1, 1で切り替えている """ state = -1 if init_state==0 else init_state while True: state = -1 * state yield numpy.maximum(0, state) def push_powerbutton(self): self._state = self._button.__next__() def step(self, action): """ action: 0->電源ボタンを押す 1->餌ボタンを押す """ if action == 0: self.push_powerbutton() reward = 0 else: reward = 1 if self._state==1 else 0 return self._state, reward def get_action(qtable, state, episode): ## ε- greedy法 行動を重ねる(episodeが増える)毎にランダム行動を取らなくする epsilon = 0.5 * (1 / (episode + 1)) if epsilon <= numpy.random.uniform(0, 1): ## Q値に従った行動。Q値が最大の行動を取る ## まずQ値が最大の要素を確認(必ずしも1つでないので) qmax_bool_list = qtable[state]==qtable[state].max() # 複数あるときはその中からランダムに選ぶ choices = numpy.where(qmax_bool_list)[0] else: ## ランダム行動 choices = numpy.arange(len(qtable[state])) return numpy.random.choice(choices) def update_qtable(qtable, state, action, next_state, reward): gamma = 0.9 alpha = 0.5 next_qmax = max(qtable[next_state]) qtable[state, action] = (1 - alpha) * qtable[state, action] +\ alpha * (reward + gamma * next_qmax) return qtable qtable = numpy.zeros((2, 2)) MAX_STEP_NUMBER = 5 # 1episodeあたり5回行動することとする MAX_EPISODE_NUMBER = 10 # episodeは10までとする for episode in range(MAX_EPISODE_NUMBER): total_reward = 0 state = numpy.random.choice([0, 1]) # ディスペンサー電源初期状態はランダム env = Dispenser(state) for step in range(MAX_STEP_NUMBER): action = get_action(qtable, state, episode) # q値、状態から行動を決める next_state, reward = env.step(action) # 行動を取った結果を得る print(state, action, reward) total_reward += reward qtable = update_qtable(qtable, state, action, next_state, reward) # Q値の更新 state = next_state print('episode : %d total reward %d' %(episode+1, total_reward)) print(qtable)