グラフ機械学習と強化学習について

主にグラフ機械学習や強化学習手法を記載します。

Fitted Q-iteration

久しぶりの記事です。

オフライン強化学習を真面目に使いこなしていきたい。

ということでオフライン強化学習の中では基本的な手法であるFitted Q-iterationについてみていきます。D. Ernstらによって2005年に提案されています。

手法理解を優先とするため厳密さに欠けるところがあると思いますが、ご容赦ください。 Neural fitted Q-iterationやDeep Q-networkの基礎となっている手法です。

Value Iteration

強化学習reinforcement learningは、エージェントが環境とのやり取りを行いながら、累積報酬が最大になるような方法を求める機械学習手法です。

はじめに強化学習の設定を定義します。時刻$t$における状態$s_t$, 行動$a_t$とすると、エージェントが行動$a_t$を行なったとき、環境は、次の状態$s_{t+1}$, 報酬$r_{t}=R(s_t, a_t)$及び終端条件を返し、かつマルコフ決定過程に従います。 エージェントの方策(policy)を$\pi(a_t | s_t)$と表現します。この方策は、機械学習モデルで表現されることが多いですが、どのようなモデルでも利用できます。

次に、現在の状態や行動に対する期待報酬の予測値を表す「価値」を導入します。ある状態での状態価値$V^{\pi}(s)$は次のように定義されます。

$$ V^{\pi}(s)=\mathbb{E}\left[\sum_{k=0}^{\infty} \gamma^{k} r_{t+k+1} | s_t = s \right] $$

ここで、$\gamma$は割引率で、将来の報酬ほど価値を減衰させるパラメータです。 方策$\pi$に従って行動した時の状態$s$における期待報酬を表しています。これは状態価値(state valueと呼ばれます。一方、状態と行動に対する価値は

$$ Q^{\pi}(s, a)=\mathbb{E}\left[\sum_{k=0}^{\infty} \gamma^{k} r_{t+k+1} | s_t = s, a_t=a \right] $$

と表すことができ、状態行動価値(state-action valueもしくはQ関数と呼ばれます。 最適な価値というのはベルマン方程式を再帰的に解くことで得られることが知られています。 最適な方策を$\pi^{\ast}$としたとき、以下の式で表現されます。

$$ V^{\pi^{\ast}}(s) = \max_a \left\{r + \gamma \sum_{s'}P(s'|s, \pi(s))V^{\pi^{\ast}}(s') \right\} $$

ここで、$P$は状態遷移確率で、添え字を省略し、次の状態を$s'$として書いています。 状態遷移確率は一般的に未知であり、全ての状態に対して行動をして求めるのは困難であることが多いため、ベルマン方程式を近似する方法が提案されています。 その方法の1つに、時間的差分学習(temporal difference learning, TD learning)が知られています。これは以下の式によって価値を更新します。

$$ V(s) \leftarrow V(s) + \alpha (r + \gamma V(s') - V(s)), $$

ここで$\alpha$は学習率であり、$r + \gamma V(s')$はTD targetと呼ばれます。 この場合は1次の時間発展に対するTD targetを考慮しています。

一方、Q-learningでは、次のようになります。

$$ Q(s, a) \leftarrow Q(s) + \alpha (r + \gamma \max_a Q(s', a) - Q(s, a)), $$

ある状態$s'$の時に、全ての行動に対するQを参照し、最大のQの時の行動$a$を選択することで価値を更新していく方法です。 方策オフ型(off-policy)の更新方法です。実際には、この価値は通常、ニューラルネットワークなど何らかの形でモデル化されるため、価値関数と呼びます。

最適なQ関数を求めるために、Q-learningを行っていけば良いのですが、Q関数を近似するモデルを考える場合、誤差関数を定義する必要があります。 多くの場合、Q targetとTD targetとの二乗誤差が最小になるように価値関数を構築しますが、これはmean square Bellman error (MSBE)と呼ばれます。 すなわち、以下の式で表されます。

$$ L = \mathbb{E} \left[ (Q_{target} - (r + \gamma \max_{a'} Q)^{2} \right] $$

この時、求めるべき回帰モデルが$Q=f(s, a)$となります。 実際に探索と活用を行いながら、この回帰モデルを学習することで、Q関数を求めることができるようになります。

例えば、DQNでは、Q関数はニューラルネットワークで表現されますが、学習を安定させるため、以下のような工夫を行なっています。

  • MSEではなくhuber lossの活用
  • 経験再生バッファ(Experience replay buffer)からのランダムサンプリング
  • Fixed Q-targets (target Qを算出するニューラルネットの重みを数ステップ前のものを利用)
  • 報酬のクリッピング

現在では、DQNをより安定に、さらに性能を良くするための工夫を施した論文が数多く提案されています。

Fitted Q-iteration

時代が遡ることになりますが、ここからがfitted Q-iteration (FQI)の解説です。実際には、上記のもので解説はほとんど終わっており、DQNでモデルを決定木やRandom forestのようなモデルにしたものがFQIです。ニューラルネットワークにしたものはNeural fitted Q-iteration (NFP)と呼ばれています。オフライン強化学習(バッチ強化学習)では、得られたデータのみから強化学習モデルを構築します。すなわち、経験再生バッファが更新されない状況です。

アルゴリズムはFigure 1のようになっています。xが状態、uが行動となっている点に注意です。

FQI

ここでは実装面からFQIを見ていきます。

オフラインデータの収集

本来は実験などによって得られたものを使いますが、ここでは、FQIの実装確認のため、Q-learningを使ってデータを収集します。 通常、Q-learningでは状態を離散化し、Q table形式で学習をしていきます。FQIではQ関数がモデル化されるため連続値で利用できます。

import gym
rom collections import Counter, defaultdict
from queue import Queue
from typing import Tuple
import numpy as np


class QTable:
    def __init__(self, env, num_digit=6, init_qtable="random"):
        """
        Observation bounds:
            cart_x = (-2.4, 2.4)
            cart_v = (-3.0, 3.0)
            pole_angle = (-0.2094, 0.2094) # in radians, which is approx. (-12°, 12°)
            pole_v = (-2.0, 2.0)
        """
        self.env = env
        self.num_digit = num_digit

        if init_qtable == "random":
            self.q_table = np.random.uniform(
                low=0,
                high=1,
                size=(num_digit ** env.observation_space.shape[0], env.action_space.n),
            )
        else:
            self.q_table = np.zeros(
                shape=(num_digit ** env.observation_space.shape[0], env.action_space.n)
            )

        self.bound = np.array([[-2.4, 2.4], [-3.0, 3.0], [-0.2, 0.2], [-2.0, 2.0]])
        self.bins_list = [self.create_bins(x, y) for x, y in self.bound]
        self.shape = self.q_table.shape

    def create_bins(self, low, high):
        """Utility function to create bins."""
        return np.linspace(low, high, self.num_digit + 1)[1:-1]

    def digitize(self, observation: np.ndarray) -> int:
        """Returns discrete state from observation.

        This method digitizes the continuous state into discrete state.
        """
        digit = [np.digitize(obs, lst) for obs, lst in zip(observation, self.bins_list)]
        # convert n-digit to 10-digit
        ids = sum([dig * (self.num_digit**i) for i, dig in enumerate(digit)])
        return ids

    def __getitem__(self, idx):
        return self.q_table[idx]

    def __setitem__(self, key, value):
        self.q_table[key] = value

    def __repr__(self):
        return f"{self.__class__.__name__}(env={self.env}, num_digits={self.num_digit}, q_table={self.q_table})"


class Action:
    def __init__(self, env):
        self.num_action = env.action_space.n

    def greedy(self, q_table: QTable, state: int) -> int:
        return int(np.argmax(q_table[state]))

    def epsilon_greedy(self, q_table: QTable, state: int, episode: int) -> int:
        epsilon = 0.5 * (1 / (episode + 1))
        if np.random.uniform(0, 1) > epsilon:
            action = self.greedy(q_table, state)
        else:
            action = np.random.choice(self.num_action)
        return action


class QLearning:
    def __init__(self, env, num_digit, alpha=0.5, gamma=0.99, init_qtable="random"):
        self.action = Action(env)
        self.q_table = QTable(env, num_digit=num_digit, init_qtable=init_qtable)
        self.alpha = alpha
        self.gamma = gamma

    def update(self, state, action, reward, next_state) -> None:
        """Off-policy update

        Temporal difference target:
            TD = reward_{t} + \gamma * max_a Q(s_{t+1}, a)

        Update rule:
            Q(s_t, a_t) := Q(s_t, a_t) + \alpha * ( TD - Q(s_t, a_t) )
        """
        state = self.q_table.digitize(state)
        next_state = self.q_table.digitize(next_state)
        cur_q = self.q_table[state, action]
        td_target = reward + self.gamma * max(self.q_table[next_state, :])
        self.q_table[state, action] = cur_q + self.alpha * (td_target - cur_q)

    def compute_action(self, observation, episode: int) -> int:
        state = self.q_table.digitize(observation)
        return self.action.epsilon_greedy(
            q_table=self.q_table, state=state, episode=episode
        )
  • QTableでは、Q tableを作成するために各状態をbinningし、そのインデックスからn進数から10進数に変換するようにしています。
  • Actionでは、探索と活用部分はepsilon greedyを利用しています。
  • QLearningでは、TD learningに基づきQ更新するクラスを構築しました。

次にCartPoleをプレイするためのクラスを構築します。

class Agent:
    def __init__(self, env, episode=500, horizon=200):
        self.env = env
        self.episode = episode
        self.horizon = horizon
        self.episode_buffer = defaultdict(list)

    def compute_reward(
        self, done: bool, step: int, complete_episodes: int
    ) -> Tuple[float, int]:
        """Custom reward function"""
        if done:
            if step < 195:
                reward = -1.0
                complete_episodes = 0
            else:
                reward = 1.0
                complete_episodes += 1
        else:
            reward = 0.0

        return reward, complete_episodes

    def play_episodes(self, algo):
        complete_episodes = 0
        for episode in range(self.episode):
            state = self.env.reset()
            total_reward = 0.0
            for step in range(self.horizon):
                action = algo.compute_action(state, episode=episode)
                next_state, reward, done, _ = self.env.step(action)
                total_reward += reward
                my_reward, complete_episodes = self.compute_reward(
                    done, step, complete_episodes=complete_episodes
                )
                if episode >= 200:
                    self.episode_buffer[episode].append(
                        (state, action, next_state, my_reward)
                    )
                algo.update(state, action, my_reward, next_state)
                if done:
                    if episode % 100 == 0:
                        print(f"episode={episode}, total_reward={total_reward}")
                    break
                state = next_state

            if complete_episodes >= 10:
                print("10 times successes")
                break

env = gym.make("CartPole-v0")
q_learning = QLearning(env, num_digit=9, alpha=0.6, gamma=0.99)
agent = Agent(env, episode=1000)
agent.play_episodes(q_learning)

self.replay_buffer内に(状態、行動、次の状態、報酬)の軌跡(trajectory)を貯められるようにしています。 ステップ数が195回以下なら報酬が-1、それ以外は0、成功したら報酬が1となるようなスパースな報酬設定です。 ある程度、パフォーマンスの良い軌跡をバッファーに格納したいため、エピソードが200以上の軌跡を保存しています。

モデルの初期化

ここからは一つ一つ実行しながら見ていきます。 計算が高速なLightGBMを利用します。

初期のモデルには(状態、行動)から報酬を予測するモデルを作ります。

import pandas as pd
from lightgbm import LGBMRegressor

cols = ["state", "action", "next_state", "reward"]
episode_list = []
for i in range(len(agent.episode_buffer)):
    tmp = pd.DataFrame(agent.episode_buffer[i], columns=cols)
    episode_list.append(tmp)

data = pd.DataFrame(pd.concat(episode_list), columns=cols))
states = np.vstack(data["state"].to_numpy())
actions = data["action"].to_numpy()
X = np.c_[states, actions.reshape(-1, 1)]
y = data["reward"].to_numpy()
model = LGBMRegressor()
model.fit(X, y)

このモデルの出力結果は以下のようになります。(状態、行動)からは正しく報酬を予測できるモデルができているとは言い難い見た目になっています。

import matplotlib.pyplot as plt
plt.scatter(reward, model.predict(X))

reward=f(s, a)

FQIの実行

先ほど収集したデータを使ってQ targetを算出します。 得られたものを使ってQ関数(LightGBM)を学習します。

def compute_q_target(args):
    row, model, gamma = args
    next_state = np.array(row["next_state"])
    q_values_next_state = [
        model.predict(np.append(next_state, a).reshape(1, -1))[0] for a in [0, 1]
    ]
    max_q_value_next_state = max(q_values_next_state)
    return row["reward"] + gamma * max_q_value_next_state

def get_q_target(data, model, gamma=0.99, n_jobs=1):
    with Pool(n_jobs) as p:
        args_list = [(row, model, gamma) for _, row in data.iterrows()]
        q_targets = list(p.map(compute_q_target, args_list))
    return q_targets

# Fitted Q-iteration
q_target_list = []
rmse_list = []
for i in tqdm(range(10)):
    q_targets =get_q_target(data, model, gamma=0.99, n_jobs=8)  
    model.fit(X, q_targets)
    q_target_list.append(q_targets)
    if i > 0:
        rmse = np.sqrt(mean_squared_error(q_targets, q_target_list[i-1]))
        rmse_list.append(rmse)
        print("RMSE of Q:", round(rmse, 4))
plt.scatter(q_targets, model.predict(X))

結果を見るとQ関数を予測できていそうです。誤差(MSBE)も下がっています。

mode output

Loss of mean squared Bellman error

最後は得られたモデルを使って実際にCartPoleをプレイします。

total_reward_list = []
for i in tqdm(range(100)):
    env = gym.make("CartPole-v0")
    #env.seed(i)
    obs = env.reset()
    total_reward = 0.0
    q_values_list = []
    while True:
        q_values = [model.predict(np.append(obs, a).reshape(1, -1))[0] for a in [0, 1]]
        q_values_list.append(q_values)
        action = np.argmax(q_values)
        obs, reward, done, info = env.step(action)
        total_reward += reward
        if done:
            break
    total_reward_list.append(total_reward)    
    
print(np.mean(total_reward_list),"+/-", round(np.std(total_reward_list), 4))
> 95.23 +/- 3.187

実際に得られたiterationを20回程度行ったモデルを使うと、ある程度連続して成功できるようなQ関数(LightGBM)が学習できました。 初期の収集したデータに依存して結果はばらつきそうです。200回連続して得られるような場合もありましたが、大体は上記のような結果です。

課題

FQIは非常に単純ながらQ関数を近似することができ、環境とのやり取りが行えないよう状況では強力なオフライン強化学習手法の1つです。しかしながら、オフライン強化学習全般の欠点として、Q値の過大評価があります。Conservative Q-learing (CQL)のような方法だと正則化などを 導入し、この問題を軽減することができています。FQIではモデルが木構造のものを用いるので、見たことのない状況(O.O.D)では、ワークしないと思われます。ニューラルネットワークを利用したFQIの方が良いと思いますが、学習の不安定性が残ります。とりあえずオフライン強化学習をするならCQLを行うのが良いと思います。

WSL2にSSHをする

最近、自宅PCとして強いマシンを購入(intel i9 12900KS, DDR4 3200 128GB, RTX3090)しました。累計52万円とかなりお高い(電気代も高い)ですが、非常に快適です。

ゲームなどもしたいのでWindows 10にしましたが、せっかく良いGPUを積んでいるので、WSL2にpytorch環境を作ろうと思います。WSL2への接続方法は、以下の記事を参考にしています。

qiita.com

WSL2

WSL2はWSL1とネットワーク構成が異なり、直接sshをするのが困難です。ポートフォワーディングを行う必要があります。

WSL2のインストールは公式ドキュメントに従って行うが良いです。何も考えずにWindowsストアからUbuntuを入れるとWSL1が入るかと思います。自分はそこでWSL1からWSL2にアップグレードする際に、詰まり時間をロスしました。カーネルのアップグレードが必要となります。

docs.microsoft.com

SSH serverの設定

WSL2が入ったらopenssh-serverの設定をしておきます。

sudo apt update
sudo apt install openssh-server

あとは公開鍵の設定などをしてログインできるようにしておきます。WSLの場合は

ssh user@localhost 

でもログインできるようになります。

WSL-CUDA

WSLのCUDA Tool kitはNvidia公式から入れることができます。

CUDA Toolkit 11.6 Update 2 Downloads | NVIDIA Developer

Windows ポートフォワーディング

次に管理者権限でWindows側のPowershellを開きます。Ubuntu-20.04の場合を記載します。wsl -l -vで自身のバージョンは確認できます。

wsl -d Ubuntu-20.04 -u root exec service ssh start
$IP=wsl -d Ubuntu-20.04 exec hostname -I
netsh.exe interface portproxy reset
netsh.exe interface portproxy add v4tov4 listenport=22 connectaddress=$IP connectport=22
netsh.exe interface portproxy add v4tov4 listenport=8888 connectaddress=$IP connectport=8888

# IP Helperサービスの起動
sc.exe config iphlpsvc start=auto  # ブート時の自動設定
sc.exe start  iphlpsvc  # サービスの起動

1行目でWSLのopen-sshserverを起動します。 2行目でWSLの内部IPを取得します。WSLを起動しなおすと毎回IPが異なってしまうため、そのたびにIPを取得する必要があります。 3行目はポートフォワーディングをリセットします。これは以下のように直接削除してもよいかと思います。

netsh.exe interface portproxy delete v4tov4 listenport=22 listenaddress=*

4, 5行目でポート22や8888のポートを聞き、WSL2のアドレス:ポートに転送してくれます。そのため、外部PCからWindows側にsshhttp://PC名:8888にアクセスするだけでWSL側につなぐことができます。その際の注意点はWindows側でファイアウォールを開けておくことです。以下のコマンドで何が有効か確認できます。

PS C:\Users\name> netsh interface portproxy show all

ipv4 をリッスンする:         ipv4 に接続する:

Address         Port        Address         Port
--------------- ----------  --------------- ----------
*               8888        172.21.119.130  8888
*               22          172.21.119.130  22

PS C:\Users\admin>

6, 7行目のコマンドでポートフォワーディングが有効になります。このスクリプトportforward_wsl.ps1とし、タスクスケジューラーでブート後に実行するようにすれば完成です。正しく設定されていれば、WSL2のubuntuに接続できるようになります。

C:\Windows\System32\WindowsPowerShell\v1.0\powershell.exe -ExecutionPolicy RemoteSigned ".\portfoward_wsl.ps1

最終的に

上記のようにしたところWindowsからWSLへの自動ポートフォワーディングとバッティングして上手くいかず、最終的に以下のようにしました。

$HOST_IP=192.168.1.2  # 例
netsh.exe interface portproxy reset
netsh.exe interface portproxy add v4tov4 listenaddress=$HOST_IP listenport=22 connectaddress=localhost connectport=22
netsh.exe interface portproxy add v4tov4 listenaddress=$HOST_IP listenport=8888 connectaddress=localhost connectport=8888

WSL2の機能として、localhostに接続すると自動でWSL2のIPに転送してくれる機能がデフォルトでTrueとなっています。例えば、Jupyter notebookなどはhttp://localhost:8888を入力すると自動でhttp://172.31.236.31:8888のところに転送してくれているため、このアドレスに到達することができています。この172.31.236.31はWSL2の内部IPになっています。結局、色々試しましたが、接続先IPからlocalhostに飛ばすようにしてあげればよいだけでした。これだと動的に変わるWSL2のIPを意識する必要はなくなりそうです。このスクリプトはタスクスケジューラに追加します。

https://developer.nvidia.com/cuda-downloads?target_os=Linux&target_arch=x86_64&Distribution=WSL-Ubuntu&target_version=2.0&target_type=deb_local

MDP: Value Iteration

NeurIPS 2021のOutstanding papersの1つであるOn the Expressivity of Markov Rewardを理解したいと思いつつ、実験を通じて色々理解を深めようとしてきました(全然できていません)。 論文の内容としては、マルコフ決定過程(MDP)において、3通りのタスクの形で表現でき、そのとき

  • いかなる報酬関数によっても実現できないタスクがある

  • 仮に報酬関数が存在するならば、多項式時間で報酬設計問題を解くことができる

ということを示しています。マルコフ報酬の理論体系の提案と、上記の証明、実際のタスクにおいて本手法を使った報酬設計を行うと既存のQ-learningよりも早く収束することを示しています。

論文としては非常に洗練されており、自分もこのような形で書けたら良いなと思う次第です。

[論文] arxiv.org

[ブログ] deepmind.com

タスクとは?

MDPの仮定の下で、3つのタスクを1. Set of Acceptable Polices (SOAP), 2. Policy Ordering (PO), 3. Trajectory Ordering (TO)として表しています。基本的にマルコフ報酬の下ではこの形で表現できます。

f:id:udnp:20220220220739p:plain

引用元:DeepMindブログ

この3つのタスクの下で、「報酬」によってタスクを表現できるか研究しています。

Set of Acceptable Polices

例) 10ステップ以内にゴールに到達しよう

$$ V^{\pi_g} (s_0) \gt V^{\pi_b} (s_0) $$

初期状態の価値が大きいものが良くなります(g=good, b=bad)。方策の集合で状態価値の初期値が最も良いのを選択する場合がSOAPに対応しています。MDPでよく使われる下図でみてみます。何度か行動を行って得られた方策を使い、$V(s_0)$の値が最も大きいものを使用します。

f:id:udnp:20220220231417p:plain

たとえば、ある方策(ランダム)、時間割引率0.99、スリップなしで行動が行われた時の状態価値は以下の通りです。

1 2 3 4
1 0.9801 0.99 1.0 goal
2 0.9702 wall 0.99 fire
3 0.9605 0.9702 0.9801 0.9702

5は壁、7は火(報酬-1)、11はゴール(報酬+1)です。この通りに行動を行うならば$s_0 \rightarrow s_1 \rightarrow s_2 \rightarrow s_6 \rightarrow s_{10}$ もしくは $s_0 \rightarrow s_4 \rightarrow s_8 \rightarrow s_9 \rightarrow s_{10}$となります。

Policy Ordering

例) 5ステップでゴールに到達するのが望ましい、もしくは10ステップ以内、もしくは気にしない。

$$ V^{\pi_1} (s_0) \gt V^{\pi_2} (s_0) \gt \ldots $$

SOAPでは良い方策か悪い方策かの集合を考えていましたが、POではその中でも順序を考えます。

Trajectory Ordering

例) 安全にゴールに到達して危ない所は避けるのが望ましい。

$$ G(τ_1; s_0) \gt G(τ_2; s_0) \ldots $$

累積報酬が最大なものの方が良い。

Value Iteration

さて、上図の環境で強化学習を行います。ベルマン方程式を考えると、累積報酬の期待値が最大となる価値の推定はDynamic Programmingを行えば得られます。

最適ベルマン方程式の状態価値関数の更新式

$$ V(s) = \sum P(s'|s, \pi(s)) (R(s, \pi(s), s') + \gamma V(s')) $$

更新ルールは状態$s$において行動$a \sim \pi(\cdot| s)$を選択したときの報酬と、$s'$の状態価値の値を使って更新していきます。 状態遷移が複数ある場合では、各遷移確率分の値の総和を取ります。

 s_0から行動 a_0 に対して遷移 s_1, s_2が観測された場合、

$$ V(s_0) = \frac{s_1}{s_1 + s_2 } (R(s_0, a_0, s_1) + \gamma V(s_1)) + \frac{s_2}{s_1 + s_2 } (R(s_0, a_0, s_2) + \gamma V(s_2)) $$

となります。状態遷移確率は基本的に不明なため、ここでは到達回数の比率で計算します。 更新はバックワードに行われます。これは状態価値・行動価値関数の推定どちらでも同じものになります。

Temporal difference learning

Value iterationでは、最後まで行動を行い価値関数を更新します。 一方、Temporal difference (TD) learningに代表されるQ-learningでは、行動を最後まで行わなくても価値関数を更新できる方法です。

TDのターゲットは以下の式になります。 $$ \text{TD}_\text{target} = R(s_t, \pi(s_t)) + \gamma V(s_{t+1}) $$

Q-learningの状態価値の推定は、行動価値関数の最大値を使います。

$$ V(s_{t+1}) = \max_{a} Q(s_{t+1}, a) $$

SARSAでは、方策(状態価値)を使って行動し、その時の状態価値を使います。

$$ V(s_{t+1}) = Q(s_{t+1}, \pi(s_t)) $$

このように価値の推定の仕方にはバリエーションがあります。そしてベルマン方程式を解くためにTD targetと、現在の価値との差、TD誤差を使って更新を行っていきます。

$$ Q(s_t, a_t): =Q(s_t, a_t) + \alpha (\text{TD}_\text{target}- Q(s_t, a_t)) $$

αは学習率で、大きいほどTD誤差の影響が大きくなります。 TD誤差が大きい場合は、その行動はより価値が高い方更新されます。TD(0) learningとも呼ばれます。

サンプルコード

Grid world (FrozenLake-v0)でValue iterationとQ-learningを試した例は以下になります。

github.com

V learning

value iterationでは報酬テーブル、価値テーブル、状態遷移が必要になるため、これらを辞書形式で保存するようにします。

import gym
from collections import Counter, defaultdict

class Agent:
    def __init__(self, env):
        self.env = env
        self.transitions = defaultdict(Counter)  # (state, action)
        self.reward_table = defaultdict(float)   # (state, action, next_state)
        self.value_table = defaultdict(float)    # (state, (action))

    def set_state_action(self, state, action, reward, next_state):
        self.reward_table[state, action, next_state] = reward
        self.transitions[state, action][next_state] += 1

状態遷移確率は上述したようにパスを通った回数の比率で算出するためCounterで求めていきます。1ステップごとに(state, action, reward, next_state)を記録するようにします。

状態価値の推定は$V(s) = \sum P (s' |s, a) ( R(s, a, s') + \gamma V(s))$にて行います。その式が以下の通りです。ある状態から次の状態に変化したときの価値を求め更新してきます。

def compute_state_value(self, state, action):
        target_counts = self.transitions[state, action]
        total = sum(target_counts.values())
        state_value = 0.0
        for target_state, count in target_counts.items():
            reward = self.reward_table[state, action, target_state]
            state_value += (count / total) * (
                  reward + self.gamma * self.value_table[target_state]
         )
        return state_value

最終的に得られた値をもとに状態価値を更新します。 観測された全ての状態に対して価値を求め、その中で最大のものが状態価値となり、これを繰り返していくのがvalue iterationです。 行動を行うたびに前の状態価値が伝播され最初の状態価値が求まっていきます。

def value_iteration(self):
        for state in range(self.env.observation_space.n):
            state_values = [
                self.compute_state_value(state, action)
                for action in range(self.env.action_space.n)
            ]
            self.value_table[state] = max(state_values)

Q-learning

Qに対してもvalue iterationを使うことができます。違いは最大となるQの値を使って行動を行っていくため、行動に対する各価値も推定していきます。

TD learningでのQ-learningでは状態遷移が必要となりません。そのため必要なのは価値テーブルだけです。モデルフリーの学習になります。

class Qlearning2:
    def __init__(self, env, gamma=0.99, alpha=0.9):
        self.env = env
        self.gamma = gamma
        self.alpha = alpha
        self.state = self.env.reset()
        self.value_table = defaultdict(float)

    def sample(self) -> Tuple[np.ndarray, int, float, np.ndarray]:
        action = self.env.action_space.sample()
        cur_state = self.state
        new_state, reward, done, _ = self.env.step(action)
        self.state = self.env.reset() if done else new_state
        return cur_state, action, reward, new_state

    def update(self, state, action, reward, next_state):
        max_q, _ = self.best_value_action(next_state)
        td_target = reward + self.gamma * max_q
        cur_q = self.value_table[state, action]
        self.value_table[state, action] = cur_q + self.alpha * (td_target - cur_q)

    def best_value_action(self, state):
        best_action, best_value = None, None
        for action in range(self.env.action_space.n):
            action_value = self.value_table[state, action]
            if best_value is None or best_value < action_value:
                best_value = action_value
                best_action = action
        return best_value, best_action

    def run_episode(self, env):
        total_reward = 0.0
        state = env.reset()
        while True:
            _, action = self.best_value_action(state)
            new_state, reward, done, _ = env.step(action)
            total_reward += reward
            if done:
                break
            state = new_state
        return total_reward

sample()では、ランダムの行動を通して(state, action, reward, next_state)のペアを集めていきます。update()ではQ-learningの更新ルールに従って価値テーブルを更新します。best_value_action()では価値テーブルの中から、とある状態のとき最大の価値とその行動を返します。CartPoleでもそうですが、報酬設計をうまくしないと価値テーブルの更新がうまくいきません。

結果

main.pyに実験コードを記載しています。

import gym

from grid_world.agent import Qlearning, Vlearning
from grid_world.env import CustomMaze

if __name__ == "__main__":
    env = gym.make("FrozenLake-v1")
    # env = CustomMaze()

    test_env = gym.make("FrozenLake-v1")
    agent = Vlearning(env, gamma=0.99, epsilon=0.0)
    # agent = Qlearning(env, gamma=0.99, epsilon=0.0)
    iter_n = 0
    best_reward = 0.0
    test_episode = 20
    while True:
        iter_n += 1
        agent.play_random_steps(100)
        agent.value_iteration()

        reward = 0.0
        for _ in range(test_episode):
            reward += agent.play_episode(test_env)
        reward /= test_episode
        if reward > best_reward:
            print(f"Best reward {best_reward:.3f} -> {reward:.3f}")
            best_reward = reward
        if reward > 0.8:
            print(f"Solved: {iter_n} iterations. Best reward: {best_reward}.")
            break
Best reward 0.000 -> 0.500
Best reward 0.500 -> 0.650
Best reward 0.650 -> 0.700
Best reward 0.700 -> 0.750
Best reward 0.750 -> 0.800
Best reward 0.800 -> 0.950
Solved: 8 iterations. Best reward: 0.95.

最後に

改めて実装するとvalue iterationやQ-learningで忘れていたところを思い出せたような気がします。次回(できたら)は、方策勾配法(PG)、信頼領域を使った方策最適化(TRPO)、近接方策最適化(PPO)などを実装して理解を深めていきたいと思います。 PG, TRPO, PPOは方策にニューラルネットを使うためRllibの勉強もかねて、サンプルコードをなぞりながら実装していきたいです。

RAY RLlib

Rayは分散処理を計算するためのAPIです。その中でも特にRLlibは強化学習に特化したライブラリになっています。

シミュレーション環境さえ用意できれば、強化学習はいかに並列計算を行うかが大事になってきます。 Open MPIが有名かと思いますが、Rayを使えばノード間分散処理といった面倒な実装のところも簡単に実装することができます。

docs.ray.io

使い方

非常に簡単です。例えばCartPoleのタスクをPPOを32並列、1GPUで実行するには以下のようにconfigを設定します。

from ray.rllib.agents.ppo import PPOTrainer

# Configure the algorithm.
config = {
    "env": "CartPole-v1",
    "num_workers": 32,
    "num_gpus": 1,
    "framework": "torch", #  or "tf"
    "model": {
        "fcnet_hiddens": [64, 64],
        "fcnet_activation": "relu",
    },
    "evaluation_num_workers": 1,
    "evaluation_config": {
        "render_env": False,
    }
}

# Create our RLlib Trainer.
trainer = PPOTrainer(config=config)

for _ in range(100):
    print(trainer.train())

# Evaluate the trained Trainer (and render each timestep to the shell's output).
trainer.evaluate()

また、DQNの優先度付経験再生のサンプリングを分散に行えるようにしたAPEX(Distributed Prioritized Experience Replay)を用いる場合も

from ray.rllib.agents.dqn import ApexTrainer
trainer = ApexTrainer(config=config)

と変更するだけです。アルゴリズムは代表的なものが多数実装されています。

あとは計算資源を用意してあげるだけです。ノード間並列もray.init()にてIPを設定してあげればできます。 AWS上のEKSなどでAutoScalingもできそうですが、こちらもいずれ試していきたいと思います。

計算結果はTensorboardに記録されます。だいたい2分くらいで最大の報酬500に到達しています。

f:id:udnp:20220205185454p:plain

独自の環境、カスタムモデル、損失関数など拡張性ができるようになっています。まずはRayに実装されている強化学習手法を試してダメならモデルや損失関数を変更するというのがよさそうです。

Trainer config

強化学習の細かい設定などはTrainer configから参照できます。

https://docs.ray.io/en/latest/rllib-training.html#common-parameters

ハイパーパラメータ管理(2):MLflow + Optuna

前回の記事に引き続き、MLflow用いた機械学習のライフサイクルの管理をしていきます。

今回はグリッドサーチではなくベイズ最適化によるハイパーパラメータ探索を行っていきます。また、MLflowのモデルバージョン管理機能を用いていきます。

Optuna

Preferred Networkが開発しているハイパーパラメータチューニングのフレームワークです。私はRay Tuneを使うことが多いのですが、日本語によるサンプルコードが多いのと社内でよく使われることもあって試してみます。mlflowとも連携ができますが、各runごとにモデルのアーティファクトを保存したい場合、少し使いにくいです。

github.com

サンプルコード

LightGBMのハイパーパラメータを探索します。まずは、目的関数を設定します。

def objective(trial):
    # CVの方が良いが簡単のためホールドアウト
    data, target = sklearn.datasets.load_breast_cancer(return_X_y=True)
    train_x, test_x, train_y, test_y = train_test_split(data, target, test_size=0.25)
    dtrain = lgb.Dataset(train_x, label=train_y)

    # ここはhydraで管理した方が良いかも?
    # https://github.com/optuna/optuna/tree/master/examples/hydra
    param = {
        "objective": "binary",
        "metric": "binary_logloss",
        "verbosity": -1,
        "boosting_type": "gbdt",
        "lambda_l1": trial.suggest_loguniform("lambda_l1", 1e-8, 10.0),
        "lambda_l2": trial.suggest_loguniform("lambda_l2", 1e-8, 10.0),
        "num_leaves": trial.suggest_int("num_leaves", 2, 256),
        "feature_fraction": trial.suggest_uniform("feature_fraction", 0.4, 1.0),
        "bagging_fraction": trial.suggest_uniform("bagging_fraction", 0.4, 1.0),
        "bagging_freq": trial.suggest_int("bagging_freq", 1, 7),
        "min_child_samples": trial.suggest_int("min_child_samples", 5, 100),
    }

    gbm = lgb.train(param, dtrain)
    preds = gbm.predict(test_x)
    pred_labels = np.rint(preds)
    accuracy = sklearn.metrics.accuracy_score(test_y, pred_labels)
    return accuracy

次に、mlflowのコールバック関数を記載します。

def mlflow_callback(study, trial):
    trial_value = trial.value if trial.value is not None else float("nan"))
    with mlflow.start_run(experiment_id=experiment_id, run_name=study.study_name):
        mlflow.log_params(trial.params)
        mlflow.log_metric("accuracy", trial_value)

各トライアルにおいてハイパラとメトリックの結果を保存するように設定します。あとは実行するだけです。

if __name__ == "__main__":
    # 実験名と同じ階層の実験idを抽出する
    experiment_name = 'optuna'
    client = mlflow.tracking.MlflowClient(tracking_uri=mlflow.get_tracking_uri())
    for exp in client.list_experiments():
        if experiment_name == exp.name:
            experiment_id = exp.experiment_id
            break
    else:
        experiment_id = client.create_experiment(experiment_name)

    # ベイズ最適化の実行
    study = optuna.create_study(direction="maximize")
    study.optimize(objective, n_trials=100, callbacks=[mlflow_callback])

    print("Number of finished trials: {}".format(len(study.trials)))
    print("Best trial:")
    trial = study.best_trial
    print(f"Value: {trial.value}")
    print("Params:")
    for key, value in trial.params.items():
        print(f"{key}: {value}")

実行結果は以下のようになります。

[I 2021-03-08 22:56:28,200] A new study created in memory with name: no-name-84a20927-29be-4994-bd3b-cc311d81fd17
[I 2021-03-08 22:56:28,247] Trial 0 finished with value: 0.972027972027972 and parameters: {'lambda_l1': 4.215060077417707e-05, 'lambda_l2': 0.015415839449936278, 'num_leaves': 238, 'feature_frac
tion': 0.6673400621108638, 'bagging_fraction': 0.7442018650062918, 'bagging_freq': 2, 'min_child_samples': 41}. Best is trial 0 with value: 0.972027972027972.
[I 2021-03-08 22:56:28,487] Trial 1 finished with value: 0.958041958041958 and parameters: {'lambda_l1': 4.533990836656474, 'lambda_l2': 0.8259430522162956, 'num_leaves': 56, 'feature_fraction':
0.9744784080586081, 'bagging_fraction': 0.5414331193881199, 'bagging_freq': 3, 'min_child_samples': 9}. Best is trial 0 with value: 0.972027972027972.

MLflow ui

得られた実行結果は次のように保存されています。

f:id:udnp:20210308231617j:plain
mlflow ui optuna

モデルのバージョン管理

さて、得られたデータからモデルのバージョン管理をしていきます。

f:id:udnp:20210308231811j:plain
MLflow Model

ここから少し手順が面倒になってきます。というのもartifactとmodelのメタ情報は別々に保存していく必要があるからです。

MLflow Trackingを見るとArtifactを保存するパターンは4つほどあります

  1. MLflow on localhost
  2. MLflow on localhost with SQLite
  3. MLflow on localhost with Tracking Server
  4. MLflow with remote Tracking Server, backend and artifact stores

シナリオ1の場合は、特に何も考えずに実行したらMLflow側でmlrunsのフォルダを作成してくれます。experiment_idを指定してあげれば実験ごとにフォルダを作成してくれます。 ここでは、シナリオ2の、localhost + SQLiteの場合を試してみます。

MLflow on localhost with SQLite

ここで必要なのはSQLAlchemyです。データベースのスキーマはMLflow側で作成してくれるので、空のDBを作成します。

from sqlalchemy import create_engine

engine = create_engine('sqlite:///mlruns.db', echo=True)

次にMLflowのサンプルコードを実行しています。

def run():
    # Create two runs Log MLflow entities
    mlflow.set_tracking_uri('sqlite:///mlruns.db')
    with mlflow.start_run() as run1:
        params = {"n_estimators": 3, "random_state": 42}
        rfr = RandomForestRegressor(**params).fit([[0, 1]], [1])
        mlflow.log_params(params)
        mlflow.sklearn.log_model(rfr, artifact_path="sklearn-model")

    with mlflow.start_run() as run2:
        params = {"n_estimators": 6, "random_state": 42}
        rfr = RandomForestRegressor(**params).fit([[0, 1]], [1])
        mlflow.log_params(params)
        mlflow.sklearn.log_model(rfr, artifact_path="sklearn-model")

    # Register model name in the model registry
    name = "RandomForestRegression"
    client = mlflow.tracking.MlflowClient()
    client.create_registered_model(name)
 
 # 名前が重複する場合はエラーになるので例外処理を追加しておく
    try:
        client.get_registered_model(name)
    except:
        client.create_registered_model(name)

    # Create a two versions of the rfr model under the registered model name
    for run_id in [run1.info.run_id, run2.info.run_id]:
        model_uri = "runs:/{}/sklearn-model".format(run_id)
        mv = client.create_model_version(name, model_uri, run_id)
        print("model version {} created".format(mv.version))
    print("--")

    # Fetch the last version; this will be version 2
    mv = client.get_model_version(name, mv.version)
    print_model_version_info(mv)

これが行っているのは、各runごとにRandomForestを作成して、各パラメータと学習済みモデルを保存しています。それをmlflow.tracking.MlflowClient()を使って、モデルを登録していきます。ここで大事なのは、初めにmlflow.set_tracking_uri('sqlite:///mlruns.db')でtracking先をdbにしていることです。

f:id:udnp:20210308233523j:plain
ER図

ER図を見てもわかるようにartifact_uriがモデルを参照する場所になっています。今回ではローカルに保存していますが、AWS S3などにも簡単に保存できるようになっています。

面倒なのは、clientmlflow.run_start()です。clientの場合はrun_idを作成できるのですが、フォルダが作成されないので、最初にwith mlflow.run_start():を実行してあげる必要があります。この操作を行わないとくMLflowのUI上からアーティファクトを参照することができなくないため、エラーとなっててしまいます。

optunaの場合はベストなハイパラを使って再度学習させる必要があるので、最後にモデルを保存するようなコードを追加すればokです。

if __name__ == '__main__':
    data = sklearn.datasets.load_breast_cancer(return_X_y=False)
    train_x, test_x, train_y, test_y = train_test_split(data.data, data.target, test_size=0.25)
    dtrain = lgb.Dataset(train_x, label=train_y)
    
    # optunaで得られたベストなパラメータを使って学習
    lgb_model = lgb.train(trial.params, dtrain)
    preds = lgb_model.predict(test_x)
    pred_labels = np.rint(preds)
    accuracy = sklearn.metrics.accuracy_score(test_y, pred_labels)

    # データのスキーマを登録する
    df = pd.DataFrame(data.data, columns=data.feature_names)
    signature = infer_signature(df, preds)

    with mlflow.start_run(experiment_id=experiment_id) as run:
        mlflow.log_params(trial.params)
        mlflow.log_metric('accuracy', accuracy)
        mlflow.lightgbm.log_model(lgb_model, artifact_path='lightgbm-model', signature=signature)

    name = 'LightGBM'
    tags = {'data': 'breast cancer'}
    try:
        client.get_registered_model(name)
    except:
        client.create_registered_model(name)

    run_id = run.info.run_id
    model_uri = "runs:/{}/lightgbm-model".format(run_id)
    mv = client.create_model_version(name, model_uri, run_id, tags=tags)
    print("model version {} created".format(mv.version))

このようにすることでモデルのバージョン管理を行うことができます。モデルの精度が悪化してきた場合は、このスクリプトを実行することで継続的にメンテナンスできます。

デプロイ

mlflowはモデルをデプロイする機能もあり、REST APIで予測結果を返すこともできます。仮想環境やDocker環境を自動で構築してくれます。

mlflow serve models -m <モデルのディレクトリ>

予測を行う場合はPOSTを行います。

import requests
url = 'http://127.0.0.1:5000/invocations'
headers = {'Content-Type': 'application/json'; 'format': 'pandas-split'}

r = requests.post(url, df.to_json('split'), headers)

もしくは、単純に以下の形式でもよいです。

curl -X POST -H "Content-Type:application/json; format=pandas-split" --data "{\"columns\":[\"alcohol\", \"chlorides\", \"citric acid\", \"density\", \"fixed acidity\", \"free sulfur dioxide\", \"pH\", \"residual sugar\", \"sulphates\", \"total sulfur dioxide\", \"volatile acidity\"],\"data\":[[12.8, 0.029, 0.48, 0.98, 6.2, 29, 3.33, 1.2, 0.39, 75, 0.66]]}" http://127.0.0.1:1234/invocations

モデルの読み込みも簡単です。

loaded_model = mlflow.pyfunc.load_model(mlflow_pyfunc_model_path)

各MLflowが提供しているModelAPIを使ってもよいと思います。

最後に

MLflowを使うとモデル管理を本格的に行うことができます。Microsoft Azure MLやAmazon SageMaker等にデプロイができるようなので、次回はSageMaker (or Kubernetes)でデプロイをやってみようと思います。

実際の業務でモデル選定やハイパラ調整で時間を大きく割くのは無駄(記述子・特徴量生成の方が大事)なので、簡単なテーマならさくっとモデルを構築してすぐプロダクトを提供できるような形にしていきたいです。

ハイパーパラメータ管理:Mlflow と Hydra

機械学習を使ったプロジェクトに携わると、データ解析をしていくにつれて大量のモデルが構築され、これらのモデルを管理するだけでも大変です。IT企業のようなソフトウェアエンジニアリングの部署がある企業では、各社ベストプラクティスがあるのだと思いますが、私の場合、メーカー勤務かつ部署が研究開発ということもあり、ソフトウェアエンジニアリングのノウハウを学ぶことはできず、手探りで探索しているような状況です。となるとOSSを使うことになるのですが、機械学習のライフサイクル管理もかねてmlflowを本格的に使っていきたいと思います。

mlflow.org

また、facebook researchが作成しているHydraと呼ばれるconfig管理フレームワークを導入します。これはargparseで書いていた部分をyamlで書くことで簡略化できるようになっています。今まではスクリプト上で煩雑だったものが、yamlで管理できるため非常に見やすくなります。

hydra.cc

これら両方を用いて、ElasticNetのハイパーパラメータをグリッドサーチしていきたいと思います。

Hydra

インストール

pip install hydra-core --upgrade

使い方

まずはconfig.yamlに設定ファイルを記載していきます。

params:
  alpha: 0.5
  l1_ratio: 0.5

file:
  path: 'sklearn_elasticnet_wine/winequality-red.csv'

次に、python側では以下のようにして読み込みます。

import hydra
from omegaconf import DictConfig, OmegaConf

@hydra.main(config_name='config')
def main(cfg: DictConfig) -> None:
        print(OmegaConf.to_yaml(cfg))

if __name__ == "__main__":
    my_app()

コマンドライン引数を読み込みたい関数にhydraをデコレートしてあげます。実行すると以下のようになります。

$ my_app.py 
params:
  alpha: 0.5
  l1_ratio: 0.5

file:
  path: 'sklearn_elasticnet_wine/winequality-red.csv'

オーバーライドをするのも簡単です。

$ my_app.py params.alpha=1.0 params.l1_ratio=1.0
params:
  alpha: 1.0
  l1_ratio: 1.0

file:
  path: 'sklearn_elasticnet_wine/winequality-red.csv'

コマンドライン引数を変更した場合はoutputsディレクトリが自動で生成されログが出力されます。

.hydra
├── config.yaml
├── hydra.yaml
└── overrides.yaml

Multirun

また、複数の引数を実行することもできます。

$ my_app.py params.alpha=0.1,0.5,1.0 params.l1_ratio=0.1,0.5,1.0
[2021-03-06 15:49:32,780][HYDRA] Launching 9 jobs locally
[2021-03-06 15:49:32,781][HYDRA]        #0 : params.alpha=0.1 params.l1_ratio=0.1
[2021-03-06 15:49:32,979][HYDRA]        #1 : params.alpha=0.1 params.l1_ratio=0.5
[2021-03-06 15:49:33,156][HYDRA]        #2 : params.alpha=0.1 params.l1_ratio=1.0
[2021-03-06 15:49:33,351][HYDRA]        #3 : params.alpha=0.5 params.l1_ratio=0.1
[2021-03-06 15:49:33,535][HYDRA]        #4 : params.alpha=0.5 params.l1_ratio=0.5
[2021-03-06 15:49:33,722][HYDRA]        #5 : params.alpha=0.5 params.l1_ratio=1.0
[2021-03-06 15:49:33,919][HYDRA]        #6 : params.alpha=1.0 params.l1_ratio=0.1
[2021-03-06 15:49:34,095][HYDRA]        #7 : params.alpha=1.0 params.l1_ratio=0.5
[2021-03-06 15:49:34,285][HYDRA]        #8 : params.alpha=1.0 params.l1_ratio=1.0

このようにすることでグリッドサーチによるハイパーパラメータ探索のコードを簡単に記述することができます。

Mlflow

mlflowを用いれば各実行条件ごとのメトリック、モデル、ハイパーパラメータを適切にログを取ることができ、可視化をすることができます。今回はグリッドサーチですが、optunaのようなベイズ最適化を用いた探索の場合では、非常に簡単にログを取ることができます。

インストール

pip install mlflow

簡単な使い方

import os
from random import random, randint
import mlflow

if __name__ == '__main__':
    with mlflow.start_run(run_id=0):
        mlflow.log_param("param1", randint(0, 100))
        for i in range(3):
            mlflow.log_metric("foo", random() + float(i))

        # Log an artifact (output file)
        if not os.path.exists("outputs"):
            os.makedirs("outputs")
        with open("outputs/test.txt", "w") as f:
            f.write("hello world!")
        mlflow.log_artifacts("outputs")

各計算結果をlog_metriclog_param等で保存していきます。run_idは自動で生成されますが自分でつけることもできます。

mlflow + hydra

ネットで調べてみると、グリッドサーチを試してみたという例は複数ありました。私もすごく単純な例で試していきます。

ベースはmlflow/examples/sklearn_elasticnet_wineのサンプルコードです。 https://github.com/mlflow/mlflow/tree/master/examples/sklearn_elasticnet_wine

Metric

import os
import warnings
import logging

import pandas as pd
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split
from sklearn.linear_model import ElasticNet

import mlflow
import hydra
from omegaconf import DictConfig

logging.basicConfig(level=logging.WARN)
logger = logging.getLogger(__name__)

EXPERIMENT_NAME = 'elasticnet_wine'

def eval_metrics(actual, pred):
    rmse = np.sqrt(mean_squared_error(actual, pred))
    mae = mean_absolute_error(actual, pred)
    r2 = r2_score(actual, pred)
    return {'rmse': rmse, 'mae': mae, 'r2': r2}

def fetch_data(cfg: DictConfig):
    data = pd.read_csv(hydra.utils.to_absolute_path(cfg.file.path), sep=";")
    train, test = train_test_split(data)

    # The predicted column is "quality" which is a scalar from [3, 9]
    train_x = train.drop(["quality"], axis=1)
    test_x = test.drop(["quality"], axis=1)
    train_y = train[["quality"]]
    test_y = test[["quality"]]
    return train_x, test_x, train_y, test_y

まずは基本的なパッケージを読み込みます。

mlflowで使う実験名も最初に指定しています。データやモデルの条件が変わった場合は実験名を変えるのかなと思います。 メトリックを計算する関数と、データを読み込む関数を指定します。hydraの注意点ですが、実行時にパスが自動的に変更されてしまいますので、hydra.utils.to_absolute_path()を使って実行時のパスを取得してあげる必要があります。

Main

@hydra.main(config_name='config')
def main(cfg: DictConfig):
    warnings.filterwarnings('ignore')
    np.random.seed(40)
    hydra_path = os.getcwd()
    train_x, test_x, train_y, test_y = fetch_data(cfg)

    alpha = float(cfg.params.alpha)
    l1_ratio = float(cfg.params.l1_ratio)
    lr = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, random_state=42)
    lr.fit(train_x, train_y)
    predicted_qualities = lr.predict(test_x)
    metrics = eval_metrics(test_y, predicted_qualities)

    # MLFLOW
    os.chdir(hydra.utils.get_original_cwd())
    client = mlflow.tracking.MlflowClient()

    # データごとにExperimentを作成
    for exp in client.list_experiments():
        if EXPERIMENT_NAME == exp.name:
            experiment_id = exp.experiment_id
            break
    else:
        experiment_id = client.create_experiment(EXPERIMENT_NAME)

    run_id = client.create_run(experiment_id).info.run_id

    for k, v in cfg.params.items():
        client.log_param(run_id, k, v)

    for k, v in metrics.items():
        client.log_metric(run_id, k, v)

    hydra_files = ['.hydra/config.yaml', '.hydra/hydra.yaml', '.hydra/overrides.yaml']
    for hydra_file in hydra_files:
        client.log_artifact(run_id, os.path.join(hydra_path, hydra_file))

    with mlflow.start_run(run_id=run_id):
        mlflow.sklearn.log_model(lr, "model")

    client.set_terminated(run_id)

注意点は、先ほどと同様にhydraが自動でログを作成するのでパスが変わってしまうことです。 os.chdir(hydra.utils.get_original_cwd())でmlflowが生成するmlrusのディレクトリに保存されるようにします。

また、実験名が一致するディレクトリから実験idを取得します。run_idは実験idから自動で作成されます。

そのあとは、各ハイパーパラメータの値や、メトリックスの値をclientのメソッド方に渡してあげるだけです。 ついでにhydraの出力ファイルもアーティファクトの方に保存できるようにしています。

実行結果

下記コマンドでmlflowのUIが描画されます。内部ではreact.jsを用いています。

mlflow ui

f:id:udnp:20210306162823j:plain
mlflow

以前にoptunaを試した例も載っていますが、計算結果がすべて保存されていることが分かります。

Scatter Plot, Contour Plot, Parallel Cordinates Plotも見れるようになっています。非常に便利です。

f:id:udnp:20210306163201j:plain
mlflow2

最後に

mlflowとhydraを使ってハイパーパラメータ探索を行いモデル管理まで行いました。効率よく機械学習モデルを作成して、デプロイまで考えるとなるとこういった技術は早く使えこなせるようになっていきたいと思います。

保存先もfileではなくてデータベースに保存したり、分散学習した場合のハイパーパラメータ管理など、こちらも手を付けていきたいと思います。

参考にさせていただいたサイト