夜は短し歩けよ未来大生

未来大生が人工知能を中心に勉強していく上で、学習メモや日記として書いていきます。

簡単に理解してDQNを実装してみる

この記事は

この記事はFUN Advent Calendar 2018 5日目の記事です。 去年に引き続き12/5に記事を公開してます。

昨日は mecaota さんでした。

注意

時間足りなくて途中で心が折れてます。気をつけてください。 だいぶ適当なこと書いてるので、間違いは訂正コメントしてください。 時間見つけて直します()

この記事を読んでわかること

  • DQNの概要
  • DQNがどんなしくみでできているのか
  • DQNの実装
  • PLEでの学習のさせ方
  • OpenAI Gym以外での学習

この記事を読んでもわからないこと

この記事の対象者

  • MNISTはやったことある人
  • numpyある程度分かる人(行列の計算とかreshapeとか)
  • 深層強化学習に興味あるけど、強化学習がいまいちわからない人
  • OpenAI Gymでしかやってなくて少し飽きてきた人

<注意 >

  • DQNを実装してみた
  • DQNの忠実な再現ではない(Fitted Neural NetworkとDeep Q Networkを組み合わせたような感じ)
  • 正確な表現ではなく、私の感覚的な表現が多いです(コメントで表現の訂正お待ちしてます)
  • こちらの本をメチャクチャ参考にしています。この本とてもおすすめです。

つくりながら学ぶ! 深層強化学習 ~PyTorchによる実践プログラミング~

今回実装したソースコードコチラ (実装はPyTorch, Jupyter Notebook)

今回実装したらこうなるよ↓

DQNとは

まず、DQNを説明する前に強化学習(Reinforcement Learning)について整理しておく必要があると思います。

強化学習(きょうかがくしゅう、: Reinforcement learning)とは、ある環境内におけるエージェントが、現在の状態を観測し、取るべき行動を決定する問題を扱う機械学習の一種。エージェントは行動を選択することで環境から報酬を得る。強化学習は一連の行動を通じて報酬が最も多く得られるような方策(policy)を学習する。代表的な手法としてTD学習Q学習が知られている。 (Wikipedia)

ちょっとイメージしにくいですよね。

宿題をやりたくない小学生のAくんを考えてみる

イメージしやすくするために、宿題をやりたくない小学生のAくんを考えてみます。

Aくんは宿題をやりたくありません。でも、Aくんのお母さんは宿題をやるように言います。

そしてお母さんは機嫌が良いときと悪い時があり、機嫌が良いと宿題に対して寛容になったり、機嫌が悪いと宿題に対して厳しくなります。

そこで、Aくんは機嫌を見て少しでも宿題をやらずにお母さんを誤魔化すために色んな試行錯誤をします。

例えば、

  • 機嫌が良い時に宿題をやってるふりをしながらYouTubeを見る
  • 機嫌が悪い時に宿題を学校に忘れる
  • 機嫌の悪い時に家のお手伝いに精を出す ....etc

などなど、色んな方法を繰り返してみます。

そして基本的には、これらの結果として2つの可能性が考えられます。

  1. お母さんをうまく誤魔化し、褒めてもらえるf:id:tkr1205:20181125175245p:plain
  2. お母さんをうまく誤魔化せず、怒られるf:id:tkr1205:20181125175250p:plain

これを毎日繰り返していくことで、お母さんの機嫌を見て誤魔化せる方法をAくんは知るでしょう。

逆に怒られてしまうような行動も知ることになります。

このようにして、お母さんの様子を見て、宿題をやらずに誤魔化せる方法をAくんは知ることができました。

Aくんのお話を強化学習の言葉で置き換えてみる

強化学習ではAくんはエージェント(Agent)と呼ばれます。ゲームで例えるとゲームをプレイする脳みそを持つプレイヤーのことでもあります(囲碁や将棋なら指し手、テニスや卓球ならプレイヤー)。

お母さんの機嫌の良い悪いは、強化学習における状態(State)と呼ばれます。ゲームで例えるとゲームをプレイするフィールドのことです(囲碁や将棋なら盤面、テニスならボールの位置など)。

そして、エージェントは状態からどう動くのかを決めます。例えば、お母さんの機嫌が良い場合(状態)に「宿題をやらずにゲームをする」といった、行動を選択します。この選択した行動を強化学習では行動(Action)と呼びます。

また、Aくんの結果によって 1.褒められる 2.怒られる の2つの結果が得られました。これらを強化学習では報酬(Reward)と呼びます。つまり、良い結果が得られれば褒められ、悪い結果となれば怒られるのです。強化学習では、この報酬の和を最大化します。つまり、たくさん褒められようとします。そりゃそうですよね、怒られるよりたくさん褒められたいですよね。

表で一度整理しておきます

Aくんのお話 強化学習のお話
Aくん エージェント(Agent)
お母さんの機嫌 状態(State)
Aくんの行動 行動(Action)
褒められる or 怒られる 報酬(Reward)

強化学習の言葉で言い換えると

ここで、Aくんのお話を少し強化学習で言い換えてみると、

Agent(Aくん)は状態(お母さんの機嫌)を観測し、報酬の和を最大化する(たくさん褒められる)行動(誤魔化しかた)を選択する

このようにして強化学習では状態を観測し、その中で最も良いサボり方を学習しようといった手法です。

この手法の中に有名なものでは、

  • Q学習
  • Sarsa
  • Deep Q-Network(今回のメインのお話)

といったものがあります。

Q学習とは

DQNを理解する上で重要なアルゴリズムが一つあります。それはQ学習(Q-Learning)です。

Q学習とは、

Q学習は有限マルコフ決定過程において全ての状態が十分にサンプリングできるようなエピソードを無限回試行した場合、最適な評価値に収束する (Wikipedia)

これもわかりにくいですよね。

まず有限マルコフ決定過程とは、時刻t+1への状態遷移は時刻tにおける状態と行動にのみ依存し、それ以前の状態や行動には関係がないということです。

上のAくんの例で例えるならば、Aくんが褒められるか怒られるかは、その前にAくんがどんな行動を取ったかによってのみ決まり、何日も前のことなど何も関係ないということです。

考えてみれば当然のことです。今日出された宿題を上手く誤魔化して褒められるか、バレて怒られるかは、昨日怒られたかどうかには関係しません。今日上手く誤魔化せるかどうかです。

Q学習ではQテーブルというものを考えます。

具体的には以下のようなものです。

f:id:tkr1205:20181125175229p:plain

Qテーブルは 状態の数(お母さんの機嫌) x 行動の数(Aくんの行動)の表で表します。

Qテーブルにかかれている数字は行動価値(action value)またはQ値と呼ばれるものです。行動価値は言葉の通り、ある状態においてその行動がどれくらい良い行動なのかを表すものです。例えば、お母さんの機嫌の悪いときに「ゲームをする」という行動の価値はとても低いです。ですが、機嫌の良いときに「お手伝いをする」という選択の価値は高いです。

Qテーブルはいわば辞書のようなもので、このような表にすることで状態と行動の辞書のようなものができます。

そこで、以下のようにして今の時刻における状態をQテーブルから参照してあげることで、最も行動価値の高い行動(最も褒められる行動)がわかります。

f:id:tkr1205:20181125175310j:plain

ということは、この行動価値がとても重要な値だと分かると思います。もしも値が間違っていた場合、お母さんの機嫌が悪いときに「ゲームをする」という行動をしてしまうかもしれません。

では、この行動価値はどのようにして求めるのでしょうか? 行動価値を求めるには行動価値関数(action value function)というものを使います。

Q学習では、この行動価値関数をより良いものにするべく学習していきます。

そして、そのためにQ学習では以下のような状態価値の更新式を用います。 (Q(s_t, a_t) は時刻tの状態s_tにおけるa_tの行動価値のことです。)

[tex:Q(s_t, a_t) = Q(s_t, a_t) + \alpha(R(s_t, a_t) + \gamma\max{a'}E[Q(s{t+1}, a')] - Q(s_t, a_t))]

こんなもんいきなり見せられてもわからない人はわからないので、色々あって式の変形をしちゃいます。

[tex:Q(s_t, a_t) = R(s_t, a_t) + \gamma\max{a'}Q(s{t+1}, a')]

すると、なんとも不思議。こんなに短い式になりました。(この式変形の詳細が知りたい人は書籍やGoogleを参考に)

R(s_t, a_t)とは、状態s_tにおいてa_tという行動を取った結果の報酬のことで、褒められたか怒られたかを教えてくれます。

[tex:\gamma\max{a'}Q(s{t+1}, a')] は一度に説明できないので分けて説明します。

まず、先にQ(s_{t+1}, a')を説明します。 これは時刻t+1の状態における行動a'の行動価値のことです。

では、このa'をどう選ぶかですが、これを指定しているのが[tex:\max{a'}]の部分です。 ここでは、[tex:\max{a'}]は「状態価値が最大となるようなa」という意味です。

つまり、[tex:\max{a'}Q(s{t+1}, a')]というのは、時刻t+1の状態における行動価値が最大になる行動の行動価値ということです。 この時刻t+1で最大となく行動価値にかけている\gammaですが、これは時間割引率と呼ばれるものです。

時間割引率とは、人の習性を参考にしたものです。例えば、人は今日得られる1万円と10年後に得られる1万円のどちらの方が欲しいかといった問題の場合、ほとんどの人は今日得られる1万円の方が欲しいと考えます。 このように未来で得るものは少し価値を差し引いて考えてあげなければなりません。そのため、ここでは時間割引率というものを使って、未来の価値を少し差し引いてあげます。

もし、上の説明が難しいと感じたなら以下のように考えてください。

行動価値関数に状態と行動を引数として渡すと行動価値を教えてくれる!!!

f:id:tkr1205:20181125175337j:plain

また、もう少し厳密にわかりやすく知りたい人はコチラの動画(ニコニコ動画)がとてもおすすめです。

ここから本番

DQNとは

DQNとは簡単に言うと行動価値関数やQテーブルの代わりにニューラルネットワークを使おうということです。

ニューラルネットワークは関数のモノマネができるということは有名です。 ポケモンで例えるならば、メタモンです。(メタモンかわいい、でもミミッキュの方がかわいい)

このQ関数をニューラルネットワーク関数近似するという考えはNeural Fitted Q Iterationというもので提案されました。

具体的には、現在の状態を入力として行動を出力するというような関数をニューラルネットワークにさせます。

f:id:tkr1205:20181125175325j:plain

本来提案されたDQNでは、ニューラルネットワークへゲームの画像が入力として渡されます。ですが、今回のDQNではマシンスペックと相談した結果、入力としてエージェントの位置などの座標を入力としてニューラルネットワークに渡しています。

また、他にもNeural Fiited Q Iterationとは異なり、こんなにも有名になった理由があります。それは、学習の安定です。今までのどのような手法と比べても安定して学習させることに成功しています。

学習の安定を実現した理由として2つ挙げられます。1つはExperience Replayと呼ばれるもの。もう一つはFixed Target Q Networkと呼ばれるものです。これらの説明は実装の項で行います。

DQNの実装

今回のDQNの実装については主にPyTorchとPLE(PyGame Learning Environment)で実装しています。

環境については以下です。

使用した環境

環境 Version
OS mac OS X Mojave
Python 3.6.1
Anaconda Anaconda 4.4.0 (x86_64)
numpy 1.15.0
matplotlib 2.0.2
PLE 3.10
pygame 1.9.4
Torch 0.4.0

PLEが公式にサポートしているのはPython2.7.6になります(2018年11月5日現在)。
他のバージョンでは動作は保証できません。

PLEなどのインストールについては以前の記事で解説しているので、コチラを参考にしてみてください。

まずは、必要なライブラリ群をインポートしていきます。

今回はFlappyBirdと呼ばれるタスクに取り組むのでPLEからFlappyBirdをインポートしています。

import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import os
os.environ["SDL_VIDEODRIVER"] = "dummy"  # ポップアウトウィンドウを表示しないようにする
from ple import PLE
from ple.games.flappybird import FlappyBird

ここで、namedtupleというものを使って、学習に使うデータの形を宣言しておきます。これに従って以降は学習を行います。

学習経過の可視化

学習のしているのをただひたすら数字で見るのもいいですが、せっかくゲームを学習させているのでどれくらいできるようになっているのか見れるようにしましょう。 表示する頻度は下のパラメータで調整できます。

from JSAnimation.IPython_display import display_animation
from matplotlib import animation
from IPython.display import display

def display_frames_as_gif(frames):
    
    plt.figure(figsize=(frames[0].shape[1]/72.0, frames[0].shape[0]/72.0), dpi=72)
    patch = plt.imshow(frames[0])
    plt.axis('off')
    
    def animate(i):
        patch.set_data(frames[i])
    
    anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames), interval=16)
    
    anim.save('movie_flappy_bird_DQN.mp4')
    display(display_animation(anim, default_mode='once'))

namedtupleを使うことで、変数をまとめることができ、変数えのアクセスや管理がしやすくなります。 今回は学習に(現在の状態, 行動, 行動をとった後の状態, 得られた報酬)を使います。これらが必要な理由がもしわからない場合は、Q学習とはの項から読み直してみると分かるかもしれません。

from collections import namedtuple
Transition = namedtuple('Transicion', ('state', 'action', 'next_state', 'reward'))

次に学習に使うハイパーパラメータを宣言しておきます。これは、学習が上手くいかない場合に変更することが多いので、以下の変数はハイパーパラメータとして宣言しておくことをオススメします。

GAMMA = 0.99 # 時間割引率ガンマ
MAX_STEPS = 1200 # 1試行のstep数(フレーム数)
NUM_EPISODES = 800 # 最大試行回数
PRINT_EVERY_EPISODE = 50 #50回に一回logを出力する
SHOW_GIF_EVERY_EPISODE = 100 #100回に一回映像で出力する

Experience Replayについて

Experience Replayとは日本語にすると経験再生と言います。 日本語にするとダサいです。機械学習では、過学習(overfitting)というものが起きます。過学習について説明は特にしませんが、これは良くないことです。強化学習においても例外ではなく、過学習というものが発生します。これの原因は、ゲームのデータをそのまま保存してしまうと時系列的な相関が発生してしまいます。それらの相関をニューラルネットワークは勘違いして学習してしまいます。ですが、これは学習において避けたいことなので、それに対する対処を考えなくてはなりません。そこでReplay Memoryというものを考えます。

学習中のデータはReplay Memoryに一度保存します。そしてニューラルネットワークの学習の時にReplay Memoryからランダムにサンプリング(取り出す)することで時系列での相関を減らします。

class ReplayMemory:
    
    def __init__(self, CAPACITY):
        self.capacity = CAPACITY #メモリの最大値
        self.memory = [] # 経験を保存するリスト
        self.index = 0 # 保存するindexを表す変数
    
    def push(self, state, action, state_next, reward):
        '''trasicion = (state, action, state_next, reward)をReplayMemoryに保存する'''
        
        if len(self.memory) < self.capacity:
            self.memory.append(None) # メモリが満タンじゃないときは足す
            
        # 各引数をnamuedtupleでまとめ、memoryに保存する
        self.memory[self.index] = Transition(state, action, state_next, reward)
            
        self.index = (self.index + 1) % self.capacity # 保存するindexを1つずらす
        
    def sample(self, batch_size):
        '''batch_sizeだけ、ランダムに取り出す'''
        return random.sample(self.memory, batch_size)

    def __len__(self):
        '''関数lenに対して、現在のmemoryの長さを返す'''
        return len(self.memory)

ニューラルネットワークの実装 

ここからニューラルネットワークの実装に入っていきます。ニューラルネットワークの実装にはPyTorchを使用しています。

まずは、ライブラリのインポートです。

import random
from copy import deepcopy
import torch
from torch import nn
from torch import optim
import torch.nn.functional as F

次に学習に関するハイパーパラメータです。こちらも上と同様にハイパーパラメータとしてまとめて外に出しておくのがオススメです。

BATCH_SIZE = 32 # バッチサイズ
CAPACITY = 10000 # Replay Memoryに保存するデータの最大量

PyTorchでのネットワークの宣言は二種類くらい書き方があります。好みで選んでください。

今回使用したニューラルネットワークは、中間層が32個のユニットからなる2層のニューラルネットワークです。(「ディープじゃないやん」とかは無しで。。。)

class Net(nn.Module):
    
    def __init__(self, num_states, num_actions):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(num_states, 32)
        self.fc2 = nn.Linear(32, 32) #中間層
        self.fc3 = nn.Linear(32, num_actions)
    
    def forward(self, x):
        h1 = F.relu(self.fc1(x)) # 活性化関数にはReLu
        h2 = F.relu(self.fc2(h1))
        output = self.fc3(h2)
        return output

次にエージェントの脳みそとなるBrainクラスを宣言します。ここでは、どんな行動を選択するのか決めたり、ネットワークの更新を行います。

Fixed Target Q Networkについて

Experience Replayと同様に重要なメソッドです。 Experience Replayは時系列の過学習を抑制するためのものでしたが、Fixed Target Q Networkは学習を安定させるためのものです。

Fixed Target Q Networkの仕組みを簡単に説明します。強化学習では各エピソード毎に正解とする目標を定め、その目標とのloss(どれくらい間違えているか)を求め、ネットワークを修正していきます。ですが、この目標とする値が結構バラバラになります。そのせいで以下の図のように、目標の値が振動してしまい結果的に上手く学習が進まないことがあります。(緑が目標とする値)

f:id:tkr1205:20181125175345p:plain

このように目標とする値が振動してしまわないように、ある一定間隔の間は目標とする値を固定するようにします。その結果、振動しにくくなり学習が進みやすくなります。また、図にすると以下のようになります。目標とする値は振動せずに、一定時間固定されています。

f:id:tkr1205:20181125175349j:plain

1つ目の図のように目標とする値が振動してしまうのには、原因があります。それは目標とする値を出力するのもまたネットワークが行うからです。これが学習の完了しているネットワークなら問題はありませんが、未だ学習の終わってないネットワークを使って目標とする値を出しているため、図のように振動していまいます。そのため、目標とする値を出力するネットワークの重みを固定してあげます。そうすることで、同じ値を目標として学習させることができます。また、ネットワークの重みをずっと固定していても良いことはないので、定期的に学習の少し進んだネットワークの重みをコピーしてあげます。

f:id:tkr1205:20181125175344p:plain

コンストラクタでは、主にオブジェクトの生成を行っています。 Experience Replayのために経験を保存しておくためのReplay Memoryや、学習するネットワークのモデル,Fixed Target Q Networkのためにモデルのコピーも行っています。

replay関数では、Experience Replayを使って学習を行っています。 まず、Replay Memoryからバッチサイズの数だけデータを取りだします。ただ取り出しただけだと、1x4がBATCH_SIZE数ならんでいて、ネットワークの学習に使うには少し不都合です。なので、形を変えてあげます。

そこで取り出したデータを上で宣言した、namedtupleを使って(state, action, next_state, reward)の1つの形にまとめてあげます。以下のような書き方をすることで(state x バッチサイズ, action x バッチサイズ, next_state x バッチサイズ, reward x バッチサイズ)という形にすることができます。

batch = Transition(*zip(*transitions))
class Brain:
    def __init__(self, num_states, num_actions):
        self.num_actions = num_actions # 行動の数を取得
        
        # 経験を保存するメモリオブジェクトを生成
        self.memory = ReplayMemory(CAPACITY)
        
        # NNを構築
        self.model = Net(num_states, num_actions)
        
        #print(self.model) # ネットワークの形を出力
        
        # target_net
        self.target_net = copy.deepcopy(self.model)
        self.target_net.load_state_dict(self.model.state_dict())
        
        # 最適化手法の設定
        self.optimizer = optim.Adam(self.model.parameters(), lr=0.001)
        
    def replay(self):
        '''Experience Replayでネットワークの結合パラメータを出力'''
        
        # メモリサイズがミニバッチより小さい間は何もしない
        if len(self.memory) < BATCH_SIZE:
            return
        
        
        # メモリからミニバッチ分のデータを取り出す
        transitions = self.memory.sample(BATCH_SIZE)
        
        # 各変数をミニバッチに対応する形に変形
        # trainsicionsは1stepごとの(state, action. state_next, reward)が、BATCH_SIZE分格納されている
        # つまり、(state, action, state_next, reward)xBATCH_SIZE
        # これをミニバッチにしたい
        # (state x BATCH_SIZE, action x BATCH_SIZE, state_next x BATCH_SIZE, reward x BATCH_SIZE)にする
        batch = Transition(*zip(*transitions))
        
        # 各変数の要素をミニバッチに対応する形に変形する
        # 例えばstateの場合、[torch.FloatTensor of size 1x4]がBATCH_SIZE分並んでいるが、
        # それを torch.FloatTensor of BATCH_SIZE x 4に変換する
        
        state_batch = torch.cat(batch.state)
        action_batch = torch.cat(batch.action)
        reward_batch = torch.cat(batch.reward)
        non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])
        
        
        # 教師信号となるQ(s_t, a_t)値を求める
        self.model.eval()
        
        # ネットワークが出力したQ(s_t, a_t)を求める
        # self.model(state_batch)は、右左の両方のQ値を出力しており
        # [torch.FloatTensor of size BATCH_SIZE x 2]になっている
        # ここから実行したアクションa_tに対応するQ値を求めるため、action_batchで行った行動a_tが
        # 右か左かのindexを求め、それに対応するQ値をgatherで引っ張り出す
        state_action_values = self.model(state_batch).gather(1, action_batch)
        
        # max{Q(s_t+1, a)}値を求める。ただし、次の状態があるかに注意。
        
        # flappybirdがdoneになっておらず、next_stateがあるかをチェックするインデックスマスクを作成
        non_final_mask = torch.ByteTensor(
            tuple(map(lambda s: s is not None, batch.next_state)))
        
        # まずは全部0にしておく
        next_state_values = torch.zeros(BATCH_SIZE)
        
        # 次の状態があるindexの最大Q値を求める
        # 出力にアクセスし、max(1)で列方向の最大値の[値、index]を求める
        # そしてそのQ値を取り出します
        self.target_net.eval()
        next_state_values[non_final_mask] = self.target_net(
            non_final_next_states).max(1)[0].detach()
        
        # 3.4 教師となるQ(s_t, a_t)を求める
        expected_state_action_values = (next_state_values * GAMMA) + reward_batch
        
        # ネットワークを訓練モードに切り替える
        self.model.train()
        
        # 損失関数を計算する (smooth_l1_lossはHuberloss)
        # expected_state_action_valuesは
        # sizeが[minbatch]になっているから、unsqueezeで[minbatch x 1]へ
        loss = F.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1))
        
        # 結合パラメータを更新する
        self.optimizer.zero_grad() # 勾配をリセット
        loss.backward() # バックプロパゲーションを計算
        self.optimizer.step() # 結合パラメータを更新
    
    def update_target_model(self):
        # モデルの重みをtarget_networkにコピー
        self.target_net.load_state_dict(self.model.state_dict())
    
    def decide_action(self, state, episode):
        '''現在の状態に応じて、行動を決定する'''
        epsilon = 0.41 * (1 / (episode + 1))
        
        if epsilon <= np.random.uniform(0, 1):
            self.model.eval()
            with torch.no_grad():
                action = self.model(state).max(1)[1].view(1, 1)
            # ネットワークの出力の最大値のindexを取り出す = max(1)[1]
            # .view(1, 1)は[torch.LongTensor of size 1] を size 1x1 に変換する
        
        else:
            # 0, 1の行動をランダムに返す
            action = torch.LongTensor(
                    [[random.randrange(self.num_actions)]])
            # actionは[torch.LongTensor of size 1x1]の形になる
        
        return action
    
    def brain_predict(self, state):
        self.model.eval() # ネットワークを推論モードに切り替える
        with torch.no_grad():
            action = self.model(state).max(1)[1].view(1, 1)
        return action

力尽きたので、コード載せます。基本はコードにコメント書いてるので、理解の助けになると思います。

時間があるときに追記していきます。

Agent Class

ゲームをプレイするエージェントくんのクラス(例えるならばAくん)

class Agent:
    def __init__(self, num_states, num_actions):
        '''課題の状態と行動の数を設定する'''
        self.brain = Brain(num_states, num_actions)
        # エージェントが行動を決定するための頭脳を生成
        
    def update_q_network(self):
        '''Q関数を更新する'''
        self.brain.replay()
        
    def update_target_model(self):
        self.brain.update_target_model()
        
    def get_action(self, state, episode):
        '''行動を決定する'''
        action = self.brain.decide_action(state, episode)
        return action
    
    def memorize(self, state, action, state_next, reward):
        '''memoryオブジェクトに、state, action, state_next, rewardの内容を保存する'''
        self.brain.memory.push(state, action, state_next, reward)
    
    def predict_action(self, state):
        action = self.brain.brain_predict(state)
        return action

Environment Class

実際のトレーニングループを記述しています

class Environment:
    
    def __init__(self):
        self.game = FlappyBird()
        self.env = PLE(self.game, fps=30, display_screen=False)
        self.num_states = len(self.game.getGameState())  # 8
        self.num_actions = len(self.env.getActionSet()) # 1
        self.agent = Agent(self.num_states, self.num_actions)
    
    def run(self):
        '''実行'''
        episode_10_list = np.zeros(10) # 10試行分の成功したstep数を格納し、平均ステップ数を出力に利用
        episode_final = False # 最後の試行フラグ
        reward_per_epoch = []
        lifetime_per_epoch = []
        
        for episode in range(NUM_EPISODES): # 試行回数分繰り返す
            
            self.env.reset_game() # 環境の初期化
            observation = self.game.getGameState() # 観測をそのまま状態sとして使用
            state = observation
            state = np.array(list(self.get_relative_state(state)))
            state = torch.from_numpy(state).type(torch.FloatTensor) # numpy変数をPyTorchのテンソルに変換
            # FloatTensor size 4 を size 1x4に変換
            state = torch.unsqueeze(state, 0)
            
            # record frame
            frames = [self.env.getScreenRGB()]
            
            cum_reward = 0  # このエピソードにおける累積報酬の和
            t = 0 #  time-step数
            step = 0 # episode数
            
            if episode % 15 == 0:
                self.agent.update_target_model()
            
            
            while not self.env.game_over():
                step += 1
                
                action = self.agent.get_action(state, episode) # 行動を求める
                # 出力されたactionをゲームに反映し、返り値に報酬を得る
                rew = self.env.act(self.env.getActionSet()[action])
                t += 1
                observation_next = self.game.getGameState() 
                done = self.game.game_over()
                
                frames.append(self.env.getScreenRGB())
                
                # 報酬を与える。さらにepisodeの終了評価と、state_nextを設定する
                if done:  # ステップ数が200経過するか、一定角度以上傾くとdoneはtrueになる
                    state_next = None  # 次の状態はないので、Noneを格納

                    # 直近10episodeの立てたstep数リストに追加
                    episode_10_list = np.hstack(
                        (episode_10_list[1:], step + 1))
                    
                    # 罰則を与える
                    reward = torch.FloatTensor([-1.0])
                    
                else:
                    if rew > 0:
                        reward = torch.FloatTensor([1.0])
                    else:
                        reward = torch.FloatTensor([0.0])  # 普段は報酬0
                    
                    state_next = observation_next  # 観測をそのまま状態とする
                    state_next = np.array(list(self.get_relative_state(state_next)))
                    state_next = torch.from_numpy(state_next).type(
                        torch.FloatTensor)  # numpy変数をPyTorchのテンソルに変換
                    state_next = torch.unsqueeze(state_next, 0)  # size 4をsize 1x4に変換
                    
                # 1 time-stepにおける報酬和
                cum_reward += rew
                
                # メモリに経験を追加
                self.agent.memorize(state, action, state_next, reward)

                # Q-networkを更新する
                self.agent.update_q_network()

                # 観測の更新
                state = state_next
                
                # 終了時の処理
                if done:
                    print('%d Episode: Finished after %d steps:10試行の平均step数 = %.1lf' % (
                        episode, step + 1, episode_10_list.mean()))
                    reward_per_epoch.append(cum_reward)
                    lifetime_per_epoch.append(step+1)
                    break
                    
            if episode_final is True:
                # 動画の保存と描画
                display_frames_as_gif(frames)
                break
                    
            # 50エピソード毎にlogを出力
            if episode % PRINT_EVERY_EPISODE == 0:
                print("Episode %d finished after %f time steps" % (episode, t))
                print("cumulated reward: %f" % cum_reward)
                

            # 100エピソード毎にアニメーションを作成
            if episode % SHOW_GIF_EVERY_EPISODE == 0:
                print("len frames:", len(frames))
                display_frames_as_gif(frames)
                continue
            
            # 2000タイムステップ以上続いたアニメーションを作成
            if step > 2000:
                print("len frames:", len(frames))
                display_frames_as_gif(frames)
                
        # グラフの作成
        #make_graph(reward_per_epoch, lifetime_per_epoch)
    
    bucket_range_per_feature = {
        'next_next_pipe_bottom_y': 40,
        'next_next_pipe_dist_to_player': 512,
        'next_next_pipe_top_y': 40,
        'next_pipe_bottom_y': 20,
        'next_pipe_dist_to_player': 20,
        'next_pipe_top_y': 20,
        'player_vel': 4,
        'player_y': 16
    }
    
    def get_relative_state(self, state):
        # パイプの絶対位置の代わりに相対位置を使用する
        state = copy.deepcopy(state)
        state['next_next_pipe_bottom_y'] -= state['player_y']
        state['next_next_pipe_top_y'] -= state['player_y']
        state['next_pipe_bottom_y'] -= state['player_y']
        state['next_pipe_top_y'] -= state['player_y']

        # アルファベット順に並び替える
        state_key = [k for k, v in sorted(state.items())]

        # 相対位置を返す
        state_idx = []
        for key in state_key:
            state_idx.append(int(state[key] / self.bucket_range_per_feature[key]))
        return tuple(state_idx)

    
    # モデルの保存
    def save_model():
        torch.save(agent.brain.model.state_dict(), 'weight.pth')
        

main Class

# mainクラス
flappy_env = Environment()
flappy_env.run()
torch.save(flappy_env.agent.brain.model.state_dict(), 'weight.pth') #モデルの保存

おまけ

学習グラフの出力

Environmentクラスのグラフを出力する部分のコメントを外してね

def make_graph(reward_per_epoch, lifetime_per_epoch):
    fig, (axL, axR) = plt.subplots(ncols=2, figsize=(10,4))
    axL.set_title('lifetime')
    axL.grid(True)
    axL.plot(lifetime_per_epoch, color='magenta')
    axR.set_title('reward')
    axR.grid(True)
    axR.plot(reward_per_epoch , color='magenta')
    fig.show()

総評

疲れた、時間足りない。 文章とソースコードだけの説明には限界がある。 技術書わかりにくいって文句言ってたけど、そりゃそうだ、説明のしようがないね。

あと、今日誕生日なので誕生日プレゼント恵んでください。 欲しいものリスト

明日は mizukmbさんです。