化学系エンジニアがAIを学ぶ

PyTorchでディープラーニング、強化学習を学び、主に化学工学の問題に取り組みます

CNN(Convolutional Neural Network)を用いた画像識別の簡単な例

はじめに

CNNによる画像識別の簡単な例として、下記の波形図の1つ目のピークが高いか2つ目のピークが高いかの識別をPyTorchを使って試みる。

f:id:schemer1341:20200223215448p:plain:w500

データの準備

画像をCNNで使える配列の形に変換する方法はいくつかあると思うが、ここではpillowを用いた。波形の画像データがフォルダ「testfig」に置いてあるものとして、下記内容でデータを準備した。 (ここでは識別のラベルを、1つ目のピークが高い→0、2つ目のピークが高い→1とし、ファイル名を例えば0-23.pngのように(ラベル)-(画像番号).pngの形式として、ファイル名からラベルを読み込む形としている)

画像データ

ここで気をつけないといけないのは配列の形状である。PyTorchのCNNの入力の形状[バッチサイズ, チャンネル数, 高さ(ピクセル), 幅(ピクセル)]に合わせないといけない。チャンネル数は画像の奥行き(または深さ)であり、画像の場合は色に対応する。チャンネル数は例えばカラーならR, G, Bの3つ、モノクロなら1つ、などになる。

import glob
from PIL import Image
import numpy as np

# フォルダ testfig においた画像ファイルのリストを取得
image_path = "./testfig"
files = glob.glob(image_path + "/*.png")

x = []
y = []

for filename in files:
    # 画像ファイルをndarrayに変換する
    im = Image.open(filename)
    im = im.convert("1")  # モノクロ画像に変換
    im = im.resize((28, 28))  # 28x28にリサイズ
    im = np.array(im).astype(int)  # intのarrayに変換
    x.append([im])  # CNNの入力に合うshapeとなるよう注意

    # ファイル名から正解ラベルを取得
    label = int(filename[len(image_path)+1])
    y.append(label)

x = np.array(x)  # x.shape (画像数, 1, 28, 28)
y = np.array(y)  # y.shape (画像数,)


# 訓練データとテストデータに分ける
x_train, x_test, y_train, y_test = \
    train_test_split(x, y, test_size=0.2, shuffle=False)

# テンソルに変換
x_train = torch.FloatTensor(x_train)
y_train = torch.LongTensor(y_train)
x_test = torch.FloatTensor(x_test)
y_test = torch.LongTensor(y_test)

# Dataloaderを準備
train_dataloader = torch.utils.data.TensorDataset(x_train, y_train)
train_dataloader = torch.utils.data.DataLoader(train_dataloader, batch_size=4)

ニューラルネットワークの定義

ここでは、畳み込み層→プーリング層→畳み込み層→プーリング層→全結合層→全結合層→出力層という構造とした。最初の全結合層への入力はその直前の層の出力を一次元に変換する必要がある。直前の層の出力サイズの計算は例えばこちら(定番のConvolutional Neural Networkをゼロから理解する - DeepAge)が参考になるが、ここでは高さ・幅が4x4の16チャンネルとなっており、Numpyのviewメソッドを用いて形状の変換を行っている。フィルタ数やフィルタサイズはPyTorchのチュートリアル(Training a Classifier — PyTorch Tutorials 1.4.0 documentation)を参考に適当に決めた。

from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
import torch.nn.functional as F

class Model(nn.Module):
    def __init__(self):
        super(Model, self).__init__()
        self.conv1 = nn.Conv2d(1, 6, 5)  # チャンネル1, フィルタ6, フィルタサイズ5x5
        self.pool1 = nn.MaxPool2d(2, 2)  # 2x2のプーリング層
        self.conv2 = nn.Conv2d(6, 16, 5)  # チャンネル6, フィルタ16, フィルタサイズ5x5
        self.pool2 = nn.MaxPool2d(2, 2)

        # これまでの畳込みとプーリングで、16チャンネルの4x4が入力サイズ
        self.fc1 = nn.Linear(16 * 4 * 4, 64)  # fc: fully connected
        self.fc2 = nn.Linear(64, 16)
        self.fc3 = nn.Linear(16, 2)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = self.pool1(x)
        x = F.relu(self.conv2(x))
        x = self.pool2(x)
        x = x.view(-1, 16 * 4 * 4)  # [(バッチサイズ), (一次元配列データ)]に並び替え
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

学習

Dataloaderを使ってミニバッチ学習させた。学習に関してはCNNだからといって特別なところはない。

model = Model()

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

num_epochs = 20

for epoch in range(num_epochs):
    total_loss = 0.0
    for inputs, labels in train_dataloader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    print('Epoch: {}  Loss: {}'.format(epoch, total_loss))

検証(正答率の確認)

はじめに準備したテストデータを用いて正答率を確認。

outputs = model(x_test)
_, predicted = torch.max(outputs.data, 1)
correct = (y_test == predicted).sum().item()  # Tensorの比較で正答数を確認
print("正答率: {} %".format(correct/len(predicted)*100.0))

結果

問題が単純なためか、正答率は高い。

Epoch: 0  Loss: 6.9794119000434875
Epoch: 1  Loss: 6.852706849575043
Epoch: 2  Loss: 6.788195848464966
Epoch: 3  Loss: 6.761787533760071
Epoch: 4  Loss: 6.738182067871094
Epoch: 5  Loss: 6.654882311820984
Epoch: 6  Loss: 6.485922873020172
Epoch: 7  Loss: 6.05694118142128
Epoch: 8  Loss: 5.022340148687363
Epoch: 9  Loss: 2.5453680232167244
Epoch: 10  Loss: 0.31626373156905174
Epoch: 11  Loss: 0.011260125378612429
Epoch: 12  Loss: 0.0037041376635897905
Epoch: 13  Loss: 0.00182408066757489
Epoch: 14  Loss: 0.0011753853141271975
Epoch: 15  Loss: 0.0009152144466497703
Epoch: 16  Loss: 0.0007907451872597449
Epoch: 17  Loss: 0.0007194795762188733
Epoch: 18  Loss: 0.0006725111852574628
Epoch: 19  Loss: 0.0006386453569575679
正答率: 100.0 %

参考

シンプルな CNN | PyTorch で簡単なニューラルネットワークを構築し画像を分類する方法
シンプルな例で詳しく説明されています。

Pytorchのニューラルネットワーク(CNN)のチュートリアル1.3.1の解説 - Qiita
畳み込みの部分が図を用いて詳しく説明されています。

CNN(Convolutional Neural Network)を理解する - sagantaf
フィルタサイズやパディング、ストライドの影響について解説があります。

Convolutional Neural Networkとは何なのか - Qiita
CNNが具体的に何を見ているかについてわかりやすい説明があります。

メモ: Pillowを用いた深層学習用の画像データ数値化

画像データの深層学習を行うためのPillowを用いた画像データ数値化のメモ。 次の適当な画像データを、深層学習に使えるように数値化する。

f:id:schemer1341:20191027221206p:plain:w300 (testfig.png)
from PIL import Image
import numpy as np

im = Image.open("./testfig.png") # 画像ファイル読み込み
im = im.convert("L") # 画像のモード変更 "L"は8bitグレイスケール
im = im.resize((256, 256)) # 画像サイズ変更
data = np.array(im) # 画像を配列として数値データ化
im.show() # 処理後の画像表示

カラー、透過度等も含めたその他の画像モードは下記リンク参考

Concepts — Pillow (PIL Fork) 6.2.1 documentation

その他参考

https://mmm-ssss.com/2018/11/12/deeplearning_3/#toc

PIL/Pillow チートシート - Qiita

ディープラーニングによる化学プラントの異常検知の簡単な例題

はじめに

例えば連続で運転し化学品を製造するプラントにおいて、何らかの原因で反応温度や圧力が異常に上昇したり、製品中の不純物が増加するなどして運転が継続できず、プラント停止を余儀なくされることがある。この場合、プラント設備の復旧や製品在庫管理・販売計画見直しに多大な人手・費用が必要となる。

異常の兆候に早い段階で気付くことができれば、対策を講じることで必要な人手・費用を抑えることができる。このためAI(ディープラーニング)による化学プラントの異常検知が注目されており、例えば下記の三井化学の例がある。ここでは簡単な例題でディープラーニングによる異常検知をやってみる。

人工知能(AI)を用いて、化学プラントの製造過程で製品の品質予測に成功|2016|ニュースリリース|三井化学株式会社

例題

下記の原料Aから生成物Bを製造する反応器を考える。①原料流量、②原料温度、③反応圧力、④熱媒体温度を制御して、⑤出口温度をある一定の温度範囲内に収めるように運転している(範囲から外れると製品品質悪化)。

f:id:schemer1341:20190915012130p:plain:w500

以下のグラフはある1000日間の運転データである。900日を過ぎたあたりで出口温度がこのプラントの管理上限値である320℃を超え異常が認識された。出口温度の上昇は反応副生物の分解によるもので、急激に反応が起こるのでなく、ゆっくりと時間をかけて分解量が増加していくことがわかっているものとする。

f:id:schemer1341:20190916230731p:plain

反応量が少ないうちに出口温度の僅かな上昇で異常を検知できることが好ましいが、制御値①②③④の振れが出口温度を振れさせてしまい、副生物分解による僅かな温度上昇を見分けることは難しい状況である。

ディープラーニングを用いて早い段階で異常を検知することができるか?

※このデータは下記コードから人工的に生成した。気になる人は参考に。制御値①②③④はランダムで振れを発生させ、出口温度は①②③④を変数とした所定の関数から算出している。さらに700日目から日数に応じて増加する異常値を出口温度に加えている。ここで使用したデータそのものもリンク先に置いてある。

chemical-engineer-learns-ai/20190921_anomaly_detection at master · nakamura-13/chemical-engineer-learns-ai · GitHub

異常検知の手順

次の手順でディープラーニングを用いて異常を検知する。

  1. 正常な期間のデータをディープラーニングで学習し、制御値①②③④と出口温度を相関するモデルを作る
  2. 上記1.のモデルを使って異常な期間の出口温度の予測値を得る
  3. 異常な期間の実際の出口温度とディープラーニングで予測した出口温度の差を異常度とする

正常期間データを学習したモデルから得た異常期間の予測値は副生物分解による温度上昇を含まない値となる。したがって、この値と異常期間の実際の値の差を取ることで異常さの度合いが見えるはずである。

正常期間データの学習

700日目までは正常だったと考えて、その期間のデータを再現するようネットワークを学習させた。

f:id:schemer1341:20190918004339p:plain:w500

なお用いたネットワークは下記の通りで、ノード数20の中間層を2つ入れた。

## ニューラルネットワークモデル設定
class Model(nn.Module):
    def __init__(self):
        super(Model, self).__init__()
        self.l1 = nn.Linear(4, 20) # 入力4つ (原料流量、原料温度、反応圧力、熱媒体温度)
        self.l2 = nn.Linear(20, 20)
        self.l3 = nn.Linear(20, 1) # 出力1つ (出口温度)

    def forward(self, x):
        x = F.relu(self.l1(x))
        x = F.relu(self.l2(x))
        return self.l3(x)

学習させた結果、次のグラフとなった。色々と試行錯誤したものの初期の乖離は解消されなかった。しかしながら、それ以外の期間についてはほぼデータを再現できている。

f:id:schemer1341:20190918010556p:plain:w500

学習したモデルによる出口温度予測および異常度の算出

異常と考えられる期間も含めたデータとモデル予測値は次のグラフとなる。異常を認識したタイミングよりも早い時点でデータとモデル予測値に差が生じていることが見て取れる。

f:id:schemer1341:20190919000843p:plain:w500

データとモデル予測値の差の絶対値を「異常度」と定義し、それをグラフにプロットすると次の図となる。異常度がいくつになったら異常と判断するかがポイントとなってくるが、例えば初期を除き10℃以上を異常と考えるなら、100日程度早めに異常兆候を知ることができる。100日あれば事前に十分な対策を講じることができるだろう。

f:id:schemer1341:20190921175844p:plain:w500

おわりに

実際のプラントでは様々なノイズが制御値や計測値に影響するため、こう簡単にはいかないかもしれないが、実務でも適用をトライしたいと思う。

使用したコード

chemical-engineer-learns-ai/20190921_anomaly_detection at master · nakamura-13/chemical-engineer-learns-ai · GitHub

深層強化学習の簡単な例 〜Double DQN適用〜

はじめに

以前の記事で簡単な問題を対象にDQN、Experience Replayを適用してみたが、今回はさらにDouble DQNを追加してみる。なお下記サイトを参考にした。

Let’s make a DQN: Double Learning and Prioritized Experience Replay – ヤロミル

https://www.renom.jp/ja/notebooks/product/renom_rl/ddqn/notebook.html

Double DQN

通常のDQNでは次の形でニューラルネットワークを更新している。

f:id:schemer1341:20190911214824p:plain:w500

ポイントは \gamma {\rm max}Q(s_{t+1}, a)で、次の状態の最大のQ値を求めるのに自分自身のネットワークを使用している点である。もし仮にQ値の初期値が変に偏っていて、ある行動のQ値が他に比べ極端に高い場合、同じネットワークで更新を繰り返すと、その行動の価値が過大に評価されてしまうことになる。

これを緩和するために提案された方法がDouble DQNである。この方法では、メインのネットワークとは別のネットワーク(Target network)を作り、ネットワーク更新時に次の状態における行動はメインネットワークで決めるが、その行動の価値評価は別のネットワークで行う。なお別のネットワークには過去のメインネットワークを用いる。この方法を用いる場合のネットワーク更新式は次の形となる。

f:id:schemer1341:20190911011958p:plain:w250 f:id:schemer1341:20190911012155p:plain:w570

対象とする問題

前回記事と同一の報酬払出装置を考える。前回のプログラムを修正してDouble DQNを適用する。

深層強化学習(Deep Q Network, DQN)の簡単な例 〜Experience Replay追加〜 - 化学系エンジニアがAIを学ぶ

Double DQN の適用

次の通り、メインネットワークに加え、メインとは別のネットワーク(Target network)を準備する。

import torch
import torch.nn as nn
import torch.nn.functional as F

class DQN(nn.Module):
    def __init__(self):
        super(DQN, self).__init__()
        self.l1 = nn.Linear(1, 3)
        self.l2 = nn.Linear(3, 3)
        self.l3 = nn.Linear(3, 2)

    def forward(self, x):
        x = F.relu(self.l1(x))
        x = F.relu(self.l2(x))
        x = self.l3(x)
        return x

dqn = DQN()   # メインネットワーク
dqn_t = DQN() # もう1つのネットワーク(Target network)
optimizer = torch.optim.SGD(dqn.parameters(), lr=0.01)
criterion = nn.MSELoss()

Double DQNを適用する場合のネットワーク更新は下記の形となる。通常のDQNと異なるのは下記「Q値の算出」にあるa_m、max_q_nextを求める部分だけである。(なおここではExperience Replayを利用している。)

BATCH_SIZE = 50

def update_dqn(replay_memory):
    ## メモリがバッチサイズより小さいときは何もしない
    if len(replay_memory) < BATCH_SIZE:
        return

    ## ミニバッチ取得
    transitions = replay_memory.sample(BATCH_SIZE)
    ## (状態、行動、次の状態、報酬)✕バッチサイズ を (状態xバッチサイズ、行動✕バッチサイズ、、、)に変える
    batch = Transition(*zip(*transitions))
    
    ## 各値をtensorに変換
    state = torch.FloatTensor(batch.state).unsqueeze(1)
    action = torch.LongTensor(batch.action).unsqueeze(1)
    next_state = torch.FloatTensor(batch.next_state).unsqueeze(1)
    reward = torch.FloatTensor(batch.reward).unsqueeze(1)

    ## Q値の算出
    q_now = dqn(state).gather(-1, action) # 今の状態のQ値
    a_m = dqn(next_state).max(-1)[1].unsqueeze(1) # メインネットワークで決めた次のaction,[1]でindexを得る
    max_q_next = dqn_t(next_state).gather(-1, a_m) # Target networkから上で決めたactionのQ値を得る
    gamma = 0.9
    q_target = reward + gamma * max_q_next # 目標のQ値

    # DQNパラメータ更新
    optimizer.zero_grad()
    loss = criterion(q_now, q_target) # 今のQ値と目標のQ値で誤差を取る
    loss.backward()
    optimizer.step()
    
    return loss.item() # lossの確認のために返す

学習時に所定の計算ステップごとにTarget networkにメインネットワークをコピーして更新される形とする(下記コードの一番下)。更新されるまでは学習時に過去のメインネットワークを参照する形となる。

NUM_EPISODES = 1200
NUM_STEPS = 5

log = [] # 結果のプロット用

for episode in range(NUM_EPISODES):
    env = Dispenser(0)
    total_reward = 0 # 1エピソードでの報酬の合計を保持する

    for s in range(NUM_STEPS):
        ## 現在の状態を確認
        state = env.state
        ## 行動を決める
        action = decide_action(state, episode)
        ## 決めた行動に従いステップを進める。次の状態、報酬を得る
        next_state, reward = env.step(action)
        ## 結果をReplayMemoryに記録
        memory.push(state, action, next_state, reward)
        ## DQNを更新
        loss = update_dqn(memory)
        total_reward += reward

    log.append([total_reward, loss])
    if (episode+1)%100==0: print(episode+1, loss)
    ## 2エピソード = 計算10ステップ 毎の頻度でTarget networkにメインネットワークをコピー
    if episode%2==0:
        dqn_t.load_state_dict(dqn.state_dict())

学習結果

多少計算が安定している感じがするが、このくらいの問題ではあまり差はないのかもしれない。 f:id:schemer1341:20190911221649p:plain

コード

chemical-engineer-learns-ai/dispenser_Double-DQN.py at master · nakamura-13/chemical-engineer-learns-ai · GitHub

メモ: PyTorch 重み、バイアスの初期化について

はじめに

PyTorchのニューラルネットワークの重み・バイアスの初期化についてのメモを記す。

重み

重みの内容は次のようにして確認できる。

>>> import torch.nn as nn
>>> l = nn.Linear(1, 3)
>>> l.weight
Parameter containing:
tensor([[ 0.6204],
        [-0.5651],
        [-0.6809]], requires_grad=True)

重みの初期化は次のようにnn.initモジュールの関数を使用することで実施できる。

>>> nn.init.xavier_uniform_(l.weight) # Xavierの初期値
Parameter containing:
tensor([[ 0.5318],
        [-0.8438],
        [-1.1093]], requires_grad=True)

>>> nn.init.kaiming_uniform_(l.weight) # Heの初期値
Parameter containing:
tensor([[ 2.3349],
        [-0.9218],
        [-1.3357]], requires_grad=True)

単純な正規分布・一様分布や定数を設定できる関数もnn.init内に準備されている。

詳しくはこちらを参照: https://pytorch.org/docs/master/nn.init.html

バイアス

バイアスの内容は重みと同様に確認できる。

>>> l.bias
Parameter containing:
tensor([-0.9920,  0.6406, -0.6558], requires_grad=True)

バイアスの初期化も重みと同様のやり方。

>>> nn.init.uniform_(l.bias, 0, 1) # 一様分布
Parameter containing:
tensor([0.8032, 0.7810, 0.1619], requires_grad=True)

>>> nn.init.constant_(l.bias, 0) # 定数
Parameter containing:
tensor([0., 0., 0.], requires_grad=True)

デフォルトの初期値

nn.Linearのソースを見ると重みについてはHeの初期値が入っている模様。

pytorch/linear.py at master · pytorch/pytorch · GitHub

def reset_parameters(self):
    init.kaiming_uniform_(self.weight, a=math.sqrt(5))
    if self.bias is not None:
        fan_in, _ = init._calculate_fan_in_and_fan_out(self.weight)
        bound = 1 / math.sqrt(fan_in)
        init.uniform_(self.bias, -bound, bound)

参考

まるまるにっき | pytorchの重みの初期化の方法

python - How to initialize weights in PyTorch? - Stack Overflow

What is the default initialization of a conv2d layer and linear layer? - PyTorch Forums

ニュース: Preferred Networks、JXTGエネルギーと石油精製プラントの最適化・自動化に関する共同研究を開始

Preferred NetworksがJXTGエネルギーと石油精製プラントの最適化・自動化に関する共同研究を開始したそうです。10億円もの資金が調達されるそうで、一気に技術開発が進むことが期待されます。近い業界に関わる者としては、得られた知見などを可能な範囲でオープンにしてもらえたらと思いますが、難しいでしょうね。。。

it.impressbm.co.jp

iotnews.jp