より多くの人に見てもらいたいと思いQiitaで投稿しましたが、こちらにも載せておきます。
本ブログでは、より専門的な内容を記載します。 Qiitaではカジュアルな内容を投稿しています。
より多くの人に見てもらいたいと思いQiitaで投稿しましたが、こちらにも載せておきます。
本ブログでは、より専門的な内容を記載します。 Qiitaではカジュアルな内容を投稿しています。
久しぶりの記事です。
オフライン強化学習を真面目に使いこなしていきたい。
ということでオフライン強化学習の中では基本的な手法であるFitted Q-iterationについてみていきます。D. Ernstらによって2005年に提案されています。
手法理解を優先とするため厳密さに欠けるところがあると思いますが、ご容赦ください。 Neural fitted Q-iterationやDeep Q-networkの基礎となっている手法です。
強化学習(reinforcement learning)は、エージェントが環境とのやり取りを行いながら、累積報酬が最大になるような方法を求める機械学習手法です。
はじめに強化学習の設定を定義します。時刻$t$における状態$s_t$, 行動$a_t$とすると、エージェントが行動$a_t$を行なったとき、環境は、次の状態$s_{t+1}$, 報酬$r_{t}=R(s_t, a_t)$及び終端条件を返し、かつマルコフ決定過程に従います。 エージェントの方策(policy)を$\pi(a_t | s_t)$と表現します。この方策は、機械学習モデルで表現されることが多いですが、どのようなモデルでも利用できます。
次に、現在の状態や行動に対する期待報酬の予測値を表す「価値」を導入します。ある状態での状態価値$V^{\pi}(s)$は次のように定義されます。
$$ V^{\pi}(s)=\mathbb{E}\left[\sum_{k=0}^{\infty} \gamma^{k} r_{t+k+1} | s_t = s \right] $$
ここで、$\gamma$は割引率で、将来の報酬ほど価値を減衰させるパラメータです。 方策$\pi$に従って行動した時の状態$s$における期待報酬を表しています。これは状態価値(state value)と呼ばれます。一方、状態と行動に対する価値は
$$ Q^{\pi}(s, a)=\mathbb{E}\left[\sum_{k=0}^{\infty} \gamma^{k} r_{t+k+1} | s_t = s, a_t=a \right] $$
と表すことができ、状態行動価値(state-action value)もしくはQ関数と呼ばれます。 最適な価値というのはベルマン方程式を再帰的に解くことで得られることが知られています。 最適な方策を$\pi^{\ast}$としたとき、以下の式で表現されます。
$$ V^{\pi^{\ast}}(s) = \max_a \left\{r + \gamma \sum_{s'}P(s'|s, \pi(s))V^{\pi^{\ast}}(s') \right\} $$
ここで、$P$は状態遷移確率で、添え字を省略し、次の状態を$s'$として書いています。 状態遷移確率は一般的に未知であり、全ての状態に対して行動をして求めるのは困難であることが多いため、ベルマン方程式を近似する方法が提案されています。 その方法の1つに、時間的差分学習(temporal difference learning, TD learning)が知られています。これは以下の式によって価値を更新します。
$$ V(s) \leftarrow V(s) + \alpha (r + \gamma V(s') - V(s)), $$
ここで$\alpha$は学習率であり、$r + \gamma V(s')$はTD targetと呼ばれます。 この場合は1次の時間発展に対するTD targetを考慮しています。
一方、Q-learningでは、次のようになります。
$$ Q(s, a) \leftarrow Q(s) + \alpha (r + \gamma \max_a Q(s', a) - Q(s, a)), $$
ある状態$s'$の時に、全ての行動に対するQを参照し、最大のQの時の行動$a$を選択することで価値を更新していく方法です。 方策オフ型(off-policy)の更新方法です。実際には、この価値は通常、ニューラルネットワークなど何らかの形でモデル化されるため、価値関数と呼びます。
最適なQ関数を求めるために、Q-learningを行っていけば良いのですが、Q関数を近似するモデルを考える場合、誤差関数を定義する必要があります。 多くの場合、Q targetとTD targetとの二乗誤差が最小になるように価値関数を構築しますが、これはmean square Bellman error (MSBE)と呼ばれます。 すなわち、以下の式で表されます。
$$ L = \mathbb{E} \left[ (Q_{target} - (r + \gamma \max_{a'} Q)^{2} \right] $$
この時、求めるべき回帰モデルが$Q=f(s, a)$となります。 実際に探索と活用を行いながら、この回帰モデルを学習することで、Q関数を求めることができるようになります。
例えば、DQNでは、Q関数はニューラルネットワークで表現されますが、学習を安定させるため、以下のような工夫を行なっています。
現在では、DQNをより安定に、さらに性能を良くするための工夫を施した論文が数多く提案されています。
時代が遡ることになりますが、ここからがfitted Q-iteration (FQI)の解説です。実際には、上記のもので解説はほとんど終わっており、DQNでモデルを決定木やRandom forestのようなモデルにしたものがFQIです。ニューラルネットワークにしたものはNeural fitted Q-iteration (NFP)と呼ばれています。オフライン強化学習(バッチ強化学習)では、得られたデータのみから強化学習モデルを構築します。すなわち、経験再生バッファが更新されない状況です。
アルゴリズムはFigure 1のようになっています。xが状態、uが行動となっている点に注意です。
ここでは実装面からFQIを見ていきます。
本来は実験などによって得られたものを使いますが、ここでは、FQIの実装確認のため、Q-learningを使ってデータを収集します。 通常、Q-learningでは状態を離散化し、Q table形式で学習をしていきます。FQIではQ関数がモデル化されるため連続値で利用できます。
import gym rom collections import Counter, defaultdict from queue import Queue from typing import Tuple import numpy as np class QTable: def __init__(self, env, num_digit=6, init_qtable="random"): """ Observation bounds: cart_x = (-2.4, 2.4) cart_v = (-3.0, 3.0) pole_angle = (-0.2094, 0.2094) # in radians, which is approx. (-12°, 12°) pole_v = (-2.0, 2.0) """ self.env = env self.num_digit = num_digit if init_qtable == "random": self.q_table = np.random.uniform( low=0, high=1, size=(num_digit ** env.observation_space.shape[0], env.action_space.n), ) else: self.q_table = np.zeros( shape=(num_digit ** env.observation_space.shape[0], env.action_space.n) ) self.bound = np.array([[-2.4, 2.4], [-3.0, 3.0], [-0.2, 0.2], [-2.0, 2.0]]) self.bins_list = [self.create_bins(x, y) for x, y in self.bound] self.shape = self.q_table.shape def create_bins(self, low, high): """Utility function to create bins.""" return np.linspace(low, high, self.num_digit + 1)[1:-1] def digitize(self, observation: np.ndarray) -> int: """Returns discrete state from observation. This method digitizes the continuous state into discrete state. """ digit = [np.digitize(obs, lst) for obs, lst in zip(observation, self.bins_list)] # convert n-digit to 10-digit ids = sum([dig * (self.num_digit**i) for i, dig in enumerate(digit)]) return ids def __getitem__(self, idx): return self.q_table[idx] def __setitem__(self, key, value): self.q_table[key] = value def __repr__(self): return f"{self.__class__.__name__}(env={self.env}, num_digits={self.num_digit}, q_table={self.q_table})" class Action: def __init__(self, env): self.num_action = env.action_space.n def greedy(self, q_table: QTable, state: int) -> int: return int(np.argmax(q_table[state])) def epsilon_greedy(self, q_table: QTable, state: int, episode: int) -> int: epsilon = 0.5 * (1 / (episode + 1)) if np.random.uniform(0, 1) > epsilon: action = self.greedy(q_table, state) else: action = np.random.choice(self.num_action) return action class QLearning: def __init__(self, env, num_digit, alpha=0.5, gamma=0.99, init_qtable="random"): self.action = Action(env) self.q_table = QTable(env, num_digit=num_digit, init_qtable=init_qtable) self.alpha = alpha self.gamma = gamma def update(self, state, action, reward, next_state) -> None: """Off-policy update Temporal difference target: TD = reward_{t} + \gamma * max_a Q(s_{t+1}, a) Update rule: Q(s_t, a_t) := Q(s_t, a_t) + \alpha * ( TD - Q(s_t, a_t) ) """ state = self.q_table.digitize(state) next_state = self.q_table.digitize(next_state) cur_q = self.q_table[state, action] td_target = reward + self.gamma * max(self.q_table[next_state, :]) self.q_table[state, action] = cur_q + self.alpha * (td_target - cur_q) def compute_action(self, observation, episode: int) -> int: state = self.q_table.digitize(observation) return self.action.epsilon_greedy( q_table=self.q_table, state=state, episode=episode )
次にCartPoleをプレイするためのクラスを構築します。
class Agent: def __init__(self, env, episode=500, horizon=200): self.env = env self.episode = episode self.horizon = horizon self.episode_buffer = defaultdict(list) def compute_reward( self, done: bool, step: int, complete_episodes: int ) -> Tuple[float, int]: """Custom reward function""" if done: if step < 195: reward = -1.0 complete_episodes = 0 else: reward = 1.0 complete_episodes += 1 else: reward = 0.0 return reward, complete_episodes def play_episodes(self, algo): complete_episodes = 0 for episode in range(self.episode): state = self.env.reset() total_reward = 0.0 for step in range(self.horizon): action = algo.compute_action(state, episode=episode) next_state, reward, done, _ = self.env.step(action) total_reward += reward my_reward, complete_episodes = self.compute_reward( done, step, complete_episodes=complete_episodes ) if episode >= 200: self.episode_buffer[episode].append( (state, action, next_state, my_reward) ) algo.update(state, action, my_reward, next_state) if done: if episode % 100 == 0: print(f"episode={episode}, total_reward={total_reward}") break state = next_state if complete_episodes >= 10: print("10 times successes") break env = gym.make("CartPole-v0") q_learning = QLearning(env, num_digit=9, alpha=0.6, gamma=0.99) agent = Agent(env, episode=1000) agent.play_episodes(q_learning)
self.replay_buffer内に(状態、行動、次の状態、報酬)の軌跡(trajectory)を貯められるようにしています。 ステップ数が195回以下なら報酬が-1、それ以外は0、成功したら報酬が1となるようなスパースな報酬設定です。 ある程度、パフォーマンスの良い軌跡をバッファーに格納したいため、エピソードが200以上の軌跡を保存しています。
ここからは一つ一つ実行しながら見ていきます。 計算が高速なLightGBMを利用します。
初期のモデルには(状態、行動)から報酬を予測するモデルを作ります。
import pandas as pd from lightgbm import LGBMRegressor cols = ["state", "action", "next_state", "reward"] episode_list = [] for i in range(len(agent.episode_buffer)): tmp = pd.DataFrame(agent.episode_buffer[i], columns=cols) episode_list.append(tmp) data = pd.DataFrame(pd.concat(episode_list), columns=cols)) states = np.vstack(data["state"].to_numpy()) actions = data["action"].to_numpy() X = np.c_[states, actions.reshape(-1, 1)] y = data["reward"].to_numpy() model = LGBMRegressor() model.fit(X, y)
このモデルの出力結果は以下のようになります。(状態、行動)からは正しく報酬を予測できるモデルができているとは言い難い見た目になっています。
import matplotlib.pyplot as plt plt.scatter(reward, model.predict(X))
先ほど収集したデータを使ってQ targetを算出します。 得られたものを使ってQ関数(LightGBM)を学習します。
def compute_q_target(args): row, model, gamma = args next_state = np.array(row["next_state"]) q_values_next_state = [ model.predict(np.append(next_state, a).reshape(1, -1))[0] for a in [0, 1] ] max_q_value_next_state = max(q_values_next_state) return row["reward"] + gamma * max_q_value_next_state def get_q_target(data, model, gamma=0.99, n_jobs=1): with Pool(n_jobs) as p: args_list = [(row, model, gamma) for _, row in data.iterrows()] q_targets = list(p.map(compute_q_target, args_list)) return q_targets # Fitted Q-iteration q_target_list = [] rmse_list = [] for i in tqdm(range(10)): q_targets =get_q_target(data, model, gamma=0.99, n_jobs=8) model.fit(X, q_targets) q_target_list.append(q_targets) if i > 0: rmse = np.sqrt(mean_squared_error(q_targets, q_target_list[i-1])) rmse_list.append(rmse) print("RMSE of Q:", round(rmse, 4))
plt.scatter(q_targets, model.predict(X))
結果を見るとQ関数を予測できていそうです。誤差(MSBE)も下がっています。
最後は得られたモデルを使って実際にCartPoleをプレイします。
total_reward_list = [] for i in tqdm(range(100)): env = gym.make("CartPole-v0") #env.seed(i) obs = env.reset() total_reward = 0.0 q_values_list = [] while True: q_values = [model.predict(np.append(obs, a).reshape(1, -1))[0] for a in [0, 1]] q_values_list.append(q_values) action = np.argmax(q_values) obs, reward, done, info = env.step(action) total_reward += reward if done: break total_reward_list.append(total_reward) print(np.mean(total_reward_list),"+/-", round(np.std(total_reward_list), 4)) > 95.23 +/- 3.187
実際に得られたiterationを20回程度行ったモデルを使うと、ある程度連続して成功できるようなQ関数(LightGBM)が学習できました。 初期の収集したデータに依存して結果はばらつきそうです。200回連続して得られるような場合もありましたが、大体は上記のような結果です。
FQIは非常に単純ながらQ関数を近似することができ、環境とのやり取りが行えないよう状況では強力なオフライン強化学習手法の1つです。しかしながら、オフライン強化学習全般の欠点として、Q値の過大評価があります。Conservative Q-learing (CQL)のような方法だと正則化などを 導入し、この問題を軽減することができています。FQIではモデルが木構造のものを用いるので、見たことのない状況(O.O.D)では、ワークしないと思われます。ニューラルネットワークを利用したFQIの方が良いと思いますが、学習の不安定性が残ります。とりあえずオフライン強化学習をするならCQLを行うのが良いと思います。
最近、自宅PCとして強いマシンを購入(intel i9 12900KS, DDR4 3200 128GB, RTX3090)しました。累計52万円とかなりお高い(電気代も高い)ですが、非常に快適です。
ゲームなどもしたいのでWindows 10にしましたが、せっかく良いGPUを積んでいるので、WSL2にpytorch環境を作ろうと思います。WSL2への接続方法は、以下の記事を参考にしています。
WSL2はWSL1とネットワーク構成が異なり、直接sshをするのが困難です。ポートフォワーディングを行う必要があります。
WSL2のインストールは公式ドキュメントに従って行うが良いです。何も考えずにWindowsストアからUbuntuを入れるとWSL1が入るかと思います。自分はそこでWSL1からWSL2にアップグレードする際に、詰まり時間をロスしました。カーネルのアップグレードが必要となります。
WSL2が入ったらopenssh-server
の設定をしておきます。
sudo apt update sudo apt install openssh-server
あとは公開鍵の設定などをしてログインできるようにしておきます。WSLの場合は
ssh user@localhost
でもログインできるようになります。
WSLのCUDA Tool kitはNvidia公式から入れることができます。
CUDA Toolkit 11.6 Update 2 Downloads | NVIDIA Developer
次に管理者権限でWindows側のPowershellを開きます。Ubuntu-20.04の場合を記載します。wsl -l -v
で自身のバージョンは確認できます。
wsl -d Ubuntu-20.04 -u root exec service ssh start $IP=wsl -d Ubuntu-20.04 exec hostname -I netsh.exe interface portproxy reset netsh.exe interface portproxy add v4tov4 listenport=22 connectaddress=$IP connectport=22 netsh.exe interface portproxy add v4tov4 listenport=8888 connectaddress=$IP connectport=8888 # IP Helperサービスの起動 sc.exe config iphlpsvc start=auto # ブート時の自動設定 sc.exe start iphlpsvc # サービスの起動
1行目でWSLのopen-sshserverを起動します。 2行目でWSLの内部IPを取得します。WSLを起動しなおすと毎回IPが異なってしまうため、そのたびにIPを取得する必要があります。 3行目はポートフォワーディングをリセットします。これは以下のように直接削除してもよいかと思います。
netsh.exe interface portproxy delete v4tov4 listenport=22 listenaddress=*
4, 5行目でポート22や8888のポートを聞き、WSL2のアドレス:ポートに転送してくれます。そのため、外部PCからWindows側にsshやhttp://PC名:8888にアクセスするだけでWSL側につなぐことができます。その際の注意点はWindows側でファイアウォールを開けておくことです。以下のコマンドで何が有効か確認できます。
PS C:\Users\name> netsh interface portproxy show all ipv4 をリッスンする: ipv4 に接続する: Address Port Address Port --------------- ---------- --------------- ---------- * 8888 172.21.119.130 8888 * 22 172.21.119.130 22 PS C:\Users\admin>
6, 7行目のコマンドでポートフォワーディングが有効になります。このスクリプトをportforward_wsl.ps1
とし、タスクスケジューラーでブート後に実行するようにすれば完成です。正しく設定されていれば、WSL2のubuntuに接続できるようになります。
C:\Windows\System32\WindowsPowerShell\v1.0\powershell.exe -ExecutionPolicy RemoteSigned ".\portfoward_wsl.ps1
上記のようにしたところWindowsからWSLへの自動ポートフォワーディングとバッティングして上手くいかず、最終的に以下のようにしました。
$HOST_IP=192.168.1.2 # 例 netsh.exe interface portproxy reset netsh.exe interface portproxy add v4tov4 listenaddress=$HOST_IP listenport=22 connectaddress=localhost connectport=22 netsh.exe interface portproxy add v4tov4 listenaddress=$HOST_IP listenport=8888 connectaddress=localhost connectport=8888
WSL2の機能として、localhostに接続すると自動でWSL2のIPに転送してくれる機能がデフォルトでTrueとなっています。例えば、Jupyter notebookなどはhttp://localhost:8888
を入力すると自動でhttp://172.31.236.31:8888
のところに転送してくれているため、このアドレスに到達することができています。この172.31.236.31
はWSL2の内部IPになっています。結局、色々試しましたが、接続先IPからlocalhostに飛ばすようにしてあげればよいだけでした。これだと動的に変わるWSL2のIPを意識する必要はなくなりそうです。このスクリプトはタスクスケジューラに追加します。
NeurIPS 2021のOutstanding papersの1つであるOn the Expressivity of Markov Rewardを理解したいと思いつつ、実験を通じて色々理解を深めようとしてきました(全然できていません)。 論文の内容としては、マルコフ決定過程(MDP)において、3通りのタスクの形で表現でき、そのとき
いかなる報酬関数によっても実現できないタスクがある
仮に報酬関数が存在するならば、多項式時間で報酬設計問題を解くことができる
ということを示しています。マルコフ報酬の理論体系の提案と、上記の証明、実際のタスクにおいて本手法を使った報酬設計を行うと既存のQ-learningよりも早く収束することを示しています。
論文としては非常に洗練されており、自分もこのような形で書けたら良いなと思う次第です。
[論文] arxiv.org
[ブログ] deepmind.com
MDPの仮定の下で、3つのタスクを1. Set of Acceptable Polices (SOAP), 2. Policy Ordering (PO), 3. Trajectory Ordering (TO)として表しています。基本的にマルコフ報酬の下ではこの形で表現できます。
引用元:DeepMindブログ
この3つのタスクの下で、「報酬」によってタスクを表現できるか研究しています。
例) 10ステップ以内にゴールに到達しよう
$$ V^{\pi_g} (s_0) \gt V^{\pi_b} (s_0) $$
初期状態の価値が大きいものが良くなります(g=good, b=bad)。方策の集合で状態価値の初期値が最も良いのを選択する場合がSOAPに対応しています。MDPでよく使われる下図でみてみます。何度か行動を行って得られた方策を使い、$V(s_0)$の値が最も大きいものを使用します。
たとえば、ある方策(ランダム)、時間割引率0.99、スリップなしで行動が行われた時の状態価値は以下の通りです。
1 | 2 | 3 | 4 | |
---|---|---|---|---|
1 | 0.9801 | 0.99 | 1.0 | goal |
2 | 0.9702 | wall | 0.99 | fire |
3 | 0.9605 | 0.9702 | 0.9801 | 0.9702 |
5は壁、7は火(報酬-1)、11はゴール(報酬+1)です。この通りに行動を行うならば$s_0 \rightarrow s_1 \rightarrow s_2 \rightarrow s_6 \rightarrow s_{10}$ もしくは $s_0 \rightarrow s_4 \rightarrow s_8 \rightarrow s_9 \rightarrow s_{10}$となります。
例) 5ステップでゴールに到達するのが望ましい、もしくは10ステップ以内、もしくは気にしない。
$$ V^{\pi_1} (s_0) \gt V^{\pi_2} (s_0) \gt \ldots $$
SOAPでは良い方策か悪い方策かの集合を考えていましたが、POではその中でも順序を考えます。
例) 安全にゴールに到達して危ない所は避けるのが望ましい。
$$ G(τ_1; s_0) \gt G(τ_2; s_0) \ldots $$
累積報酬が最大なものの方が良い。
さて、上図の環境で強化学習を行います。ベルマン方程式を考えると、累積報酬の期待値が最大となる価値の推定はDynamic Programmingを行えば得られます。
最適ベルマン方程式の状態価値関数の更新式
$$ V(s) = \sum P(s'|s, \pi(s)) (R(s, \pi(s), s') + \gamma V(s')) $$
更新ルールは状態$s$において行動$a \sim \pi(\cdot| s)$を選択したときの報酬と、$s'$の状態価値の値を使って更新していきます。 状態遷移が複数ある場合では、各遷移確率分の値の総和を取ります。
から行動 に対して遷移が観測された場合、
$$ V(s_0) = \frac{s_1}{s_1 + s_2 } (R(s_0, a_0, s_1) + \gamma V(s_1)) + \frac{s_2}{s_1 + s_2 } (R(s_0, a_0, s_2) + \gamma V(s_2)) $$
となります。状態遷移確率は基本的に不明なため、ここでは到達回数の比率で計算します。 更新はバックワードに行われます。これは状態価値・行動価値関数の推定どちらでも同じものになります。
Value iterationでは、最後まで行動を行い価値関数を更新します。 一方、Temporal difference (TD) learningに代表されるQ-learningでは、行動を最後まで行わなくても価値関数を更新できる方法です。
TDのターゲットは以下の式になります。 $$ \text{TD}_\text{target} = R(s_t, \pi(s_t)) + \gamma V(s_{t+1}) $$
Q-learningの状態価値の推定は、行動価値関数の最大値を使います。
$$ V(s_{t+1}) = \max_{a} Q(s_{t+1}, a) $$
SARSAでは、方策(状態価値)を使って行動し、その時の状態価値を使います。
$$ V(s_{t+1}) = Q(s_{t+1}, \pi(s_t)) $$
このように価値の推定の仕方にはバリエーションがあります。そしてベルマン方程式を解くためにTD targetと、現在の価値との差、TD誤差を使って更新を行っていきます。
$$ Q(s_t, a_t): =Q(s_t, a_t) + \alpha (\text{TD}_\text{target}- Q(s_t, a_t)) $$
αは学習率で、大きいほどTD誤差の影響が大きくなります。 TD誤差が大きい場合は、その行動はより価値が高い方更新されます。TD(0) learningとも呼ばれます。
Grid world (FrozenLake-v0)でValue iterationとQ-learningを試した例は以下になります。
value iterationでは報酬テーブル、価値テーブル、状態遷移が必要になるため、これらを辞書形式で保存するようにします。
import gym from collections import Counter, defaultdict class Agent: def __init__(self, env): self.env = env self.transitions = defaultdict(Counter) # (state, action) self.reward_table = defaultdict(float) # (state, action, next_state) self.value_table = defaultdict(float) # (state, (action)) def set_state_action(self, state, action, reward, next_state): self.reward_table[state, action, next_state] = reward self.transitions[state, action][next_state] += 1
状態遷移確率は上述したようにパスを通った回数の比率で算出するためCounter
で求めていきます。1ステップごとに(state, action, reward, next_state)
を記録するようにします。
状態価値の推定は$V(s) = \sum P (s' |s, a) ( R(s, a, s') + \gamma V(s))$にて行います。その式が以下の通りです。ある状態から次の状態に変化したときの価値を求め更新してきます。
def compute_state_value(self, state, action): target_counts = self.transitions[state, action] total = sum(target_counts.values()) state_value = 0.0 for target_state, count in target_counts.items(): reward = self.reward_table[state, action, target_state] state_value += (count / total) * ( reward + self.gamma * self.value_table[target_state] ) return state_value
最終的に得られた値をもとに状態価値を更新します。 観測された全ての状態に対して価値を求め、その中で最大のものが状態価値となり、これを繰り返していくのがvalue iterationです。 行動を行うたびに前の状態価値が伝播され最初の状態価値が求まっていきます。
def value_iteration(self): for state in range(self.env.observation_space.n): state_values = [ self.compute_state_value(state, action) for action in range(self.env.action_space.n) ] self.value_table[state] = max(state_values)
Qに対してもvalue iterationを使うことができます。違いは最大となるQの値を使って行動を行っていくため、行動に対する各価値も推定していきます。
TD learningでのQ-learningでは状態遷移が必要となりません。そのため必要なのは価値テーブルだけです。モデルフリーの学習になります。
class Qlearning2: def __init__(self, env, gamma=0.99, alpha=0.9): self.env = env self.gamma = gamma self.alpha = alpha self.state = self.env.reset() self.value_table = defaultdict(float) def sample(self) -> Tuple[np.ndarray, int, float, np.ndarray]: action = self.env.action_space.sample() cur_state = self.state new_state, reward, done, _ = self.env.step(action) self.state = self.env.reset() if done else new_state return cur_state, action, reward, new_state def update(self, state, action, reward, next_state): max_q, _ = self.best_value_action(next_state) td_target = reward + self.gamma * max_q cur_q = self.value_table[state, action] self.value_table[state, action] = cur_q + self.alpha * (td_target - cur_q) def best_value_action(self, state): best_action, best_value = None, None for action in range(self.env.action_space.n): action_value = self.value_table[state, action] if best_value is None or best_value < action_value: best_value = action_value best_action = action return best_value, best_action def run_episode(self, env): total_reward = 0.0 state = env.reset() while True: _, action = self.best_value_action(state) new_state, reward, done, _ = env.step(action) total_reward += reward if done: break state = new_state return total_reward
sample()
では、ランダムの行動を通して(state, action, reward, next_state)のペアを集めていきます。update()
ではQ-learningの更新ルールに従って価値テーブルを更新します。best_value_action()
では価値テーブルの中から、とある状態のとき最大の価値とその行動を返します。CartPoleでもそうですが、報酬設計をうまくしないと価値テーブルの更新がうまくいきません。
main.py
に実験コードを記載しています。
import gym from grid_world.agent import Qlearning, Vlearning from grid_world.env import CustomMaze if __name__ == "__main__": env = gym.make("FrozenLake-v1") # env = CustomMaze() test_env = gym.make("FrozenLake-v1") agent = Vlearning(env, gamma=0.99, epsilon=0.0) # agent = Qlearning(env, gamma=0.99, epsilon=0.0) iter_n = 0 best_reward = 0.0 test_episode = 20 while True: iter_n += 1 agent.play_random_steps(100) agent.value_iteration() reward = 0.0 for _ in range(test_episode): reward += agent.play_episode(test_env) reward /= test_episode if reward > best_reward: print(f"Best reward {best_reward:.3f} -> {reward:.3f}") best_reward = reward if reward > 0.8: print(f"Solved: {iter_n} iterations. Best reward: {best_reward}.") break
Best reward 0.000 -> 0.500 Best reward 0.500 -> 0.650 Best reward 0.650 -> 0.700 Best reward 0.700 -> 0.750 Best reward 0.750 -> 0.800 Best reward 0.800 -> 0.950 Solved: 8 iterations. Best reward: 0.95.
改めて実装するとvalue iterationやQ-learningで忘れていたところを思い出せたような気がします。次回(できたら)は、方策勾配法(PG)、信頼領域を使った方策最適化(TRPO)、近接方策最適化(PPO)などを実装して理解を深めていきたいと思います。 PG, TRPO, PPOは方策にニューラルネットを使うためRllibの勉強もかねて、サンプルコードをなぞりながら実装していきたいです。
Rayは分散処理を計算するためのAPIです。その中でも特にRLlibは強化学習に特化したライブラリになっています。
シミュレーション環境さえ用意できれば、強化学習はいかに並列計算を行うかが大事になってきます。 Open MPIが有名かと思いますが、Rayを使えばノード間分散処理といった面倒な実装のところも簡単に実装することができます。
非常に簡単です。例えばCartPoleのタスクをPPOを32並列、1GPUで実行するには以下のようにconfigを設定します。
from ray.rllib.agents.ppo import PPOTrainer # Configure the algorithm. config = { "env": "CartPole-v1", "num_workers": 32, "num_gpus": 1, "framework": "torch", # or "tf" "model": { "fcnet_hiddens": [64, 64], "fcnet_activation": "relu", }, "evaluation_num_workers": 1, "evaluation_config": { "render_env": False, } } # Create our RLlib Trainer. trainer = PPOTrainer(config=config) for _ in range(100): print(trainer.train()) # Evaluate the trained Trainer (and render each timestep to the shell's output). trainer.evaluate()
また、DQNの優先度付経験再生のサンプリングを分散に行えるようにしたAPEX(Distributed Prioritized Experience Replay)を用いる場合も
from ray.rllib.agents.dqn import ApexTrainer trainer = ApexTrainer(config=config)
と変更するだけです。アルゴリズムは代表的なものが多数実装されています。
あとは計算資源を用意してあげるだけです。ノード間並列もray.init()にてIPを設定してあげればできます。 AWS上のEKSなどでAutoScalingもできそうですが、こちらもいずれ試していきたいと思います。
計算結果はTensorboardに記録されます。だいたい2分くらいで最大の報酬500に到達しています。
独自の環境、カスタムモデル、損失関数など拡張性ができるようになっています。まずはRayに実装されている強化学習手法を試してダメならモデルや損失関数を変更するというのがよさそうです。
強化学習の細かい設定などはTrainer configから参照できます。
https://docs.ray.io/en/latest/rllib-training.html#common-parameters
前回の記事に引き続き、MLflow用いた機械学習のライフサイクルの管理をしていきます。
今回はグリッドサーチではなくベイズ最適化によるハイパーパラメータ探索を行っていきます。また、MLflowのモデルバージョン管理機能を用いていきます。
Preferred Networkが開発しているハイパーパラメータチューニングのフレームワークです。私はRay Tuneを使うことが多いのですが、日本語によるサンプルコードが多いのと社内でよく使われることもあって試してみます。mlflowとも連携ができますが、各runごとにモデルのアーティファクトを保存したい場合、少し使いにくいです。
LightGBMのハイパーパラメータを探索します。まずは、目的関数を設定します。
def objective(trial): # CVの方が良いが簡単のためホールドアウト data, target = sklearn.datasets.load_breast_cancer(return_X_y=True) train_x, test_x, train_y, test_y = train_test_split(data, target, test_size=0.25) dtrain = lgb.Dataset(train_x, label=train_y) # ここはhydraで管理した方が良いかも? # https://github.com/optuna/optuna/tree/master/examples/hydra param = { "objective": "binary", "metric": "binary_logloss", "verbosity": -1, "boosting_type": "gbdt", "lambda_l1": trial.suggest_loguniform("lambda_l1", 1e-8, 10.0), "lambda_l2": trial.suggest_loguniform("lambda_l2", 1e-8, 10.0), "num_leaves": trial.suggest_int("num_leaves", 2, 256), "feature_fraction": trial.suggest_uniform("feature_fraction", 0.4, 1.0), "bagging_fraction": trial.suggest_uniform("bagging_fraction", 0.4, 1.0), "bagging_freq": trial.suggest_int("bagging_freq", 1, 7), "min_child_samples": trial.suggest_int("min_child_samples", 5, 100), } gbm = lgb.train(param, dtrain) preds = gbm.predict(test_x) pred_labels = np.rint(preds) accuracy = sklearn.metrics.accuracy_score(test_y, pred_labels) return accuracy
次に、mlflowのコールバック関数を記載します。
def mlflow_callback(study, trial): trial_value = trial.value if trial.value is not None else float("nan")) with mlflow.start_run(experiment_id=experiment_id, run_name=study.study_name): mlflow.log_params(trial.params) mlflow.log_metric("accuracy", trial_value)
各トライアルにおいてハイパラとメトリックの結果を保存するように設定します。あとは実行するだけです。
if __name__ == "__main__": # 実験名と同じ階層の実験idを抽出する experiment_name = 'optuna' client = mlflow.tracking.MlflowClient(tracking_uri=mlflow.get_tracking_uri()) for exp in client.list_experiments(): if experiment_name == exp.name: experiment_id = exp.experiment_id break else: experiment_id = client.create_experiment(experiment_name) # ベイズ最適化の実行 study = optuna.create_study(direction="maximize") study.optimize(objective, n_trials=100, callbacks=[mlflow_callback]) print("Number of finished trials: {}".format(len(study.trials))) print("Best trial:") trial = study.best_trial print(f"Value: {trial.value}") print("Params:") for key, value in trial.params.items(): print(f"{key}: {value}")
実行結果は以下のようになります。
[I 2021-03-08 22:56:28,200] A new study created in memory with name: no-name-84a20927-29be-4994-bd3b-cc311d81fd17 [I 2021-03-08 22:56:28,247] Trial 0 finished with value: 0.972027972027972 and parameters: {'lambda_l1': 4.215060077417707e-05, 'lambda_l2': 0.015415839449936278, 'num_leaves': 238, 'feature_frac tion': 0.6673400621108638, 'bagging_fraction': 0.7442018650062918, 'bagging_freq': 2, 'min_child_samples': 41}. Best is trial 0 with value: 0.972027972027972. [I 2021-03-08 22:56:28,487] Trial 1 finished with value: 0.958041958041958 and parameters: {'lambda_l1': 4.533990836656474, 'lambda_l2': 0.8259430522162956, 'num_leaves': 56, 'feature_fraction': 0.9744784080586081, 'bagging_fraction': 0.5414331193881199, 'bagging_freq': 3, 'min_child_samples': 9}. Best is trial 0 with value: 0.972027972027972.
得られた実行結果は次のように保存されています。
さて、得られたデータからモデルのバージョン管理をしていきます。
ここから少し手順が面倒になってきます。というのもartifactとmodelのメタ情報は別々に保存していく必要があるからです。
MLflow Trackingを見るとArtifactを保存するパターンは4つほどあります
シナリオ1の場合は、特に何も考えずに実行したらMLflow側でmlrunsのフォルダを作成してくれます。experiment_idを指定してあげれば実験ごとにフォルダを作成してくれます。 ここでは、シナリオ2の、localhost + SQLiteの場合を試してみます。
ここで必要なのはSQLAlchemyです。データベースのスキーマはMLflow側で作成してくれるので、空のDBを作成します。
from sqlalchemy import create_engine engine = create_engine('sqlite:///mlruns.db', echo=True)
次にMLflowのサンプルコードを実行しています。
def run(): # Create two runs Log MLflow entities mlflow.set_tracking_uri('sqlite:///mlruns.db') with mlflow.start_run() as run1: params = {"n_estimators": 3, "random_state": 42} rfr = RandomForestRegressor(**params).fit([[0, 1]], [1]) mlflow.log_params(params) mlflow.sklearn.log_model(rfr, artifact_path="sklearn-model") with mlflow.start_run() as run2: params = {"n_estimators": 6, "random_state": 42} rfr = RandomForestRegressor(**params).fit([[0, 1]], [1]) mlflow.log_params(params) mlflow.sklearn.log_model(rfr, artifact_path="sklearn-model") # Register model name in the model registry name = "RandomForestRegression" client = mlflow.tracking.MlflowClient() client.create_registered_model(name) # 名前が重複する場合はエラーになるので例外処理を追加しておく try: client.get_registered_model(name) except: client.create_registered_model(name) # Create a two versions of the rfr model under the registered model name for run_id in [run1.info.run_id, run2.info.run_id]: model_uri = "runs:/{}/sklearn-model".format(run_id) mv = client.create_model_version(name, model_uri, run_id) print("model version {} created".format(mv.version)) print("--") # Fetch the last version; this will be version 2 mv = client.get_model_version(name, mv.version) print_model_version_info(mv)
これが行っているのは、各runごとにRandomForestを作成して、各パラメータと学習済みモデルを保存しています。それをmlflow.tracking.MlflowClient()
を使って、モデルを登録していきます。ここで大事なのは、初めにmlflow.set_tracking_uri('sqlite:///mlruns.db')
でtracking先をdbにしていることです。
ER図を見てもわかるようにartifact_uriがモデルを参照する場所になっています。今回ではローカルに保存していますが、AWS S3などにも簡単に保存できるようになっています。
面倒なのは、client
とmlflow.run_start()
です。clientの場合はrun_idを作成できるのですが、フォルダが作成されないので、最初にwith mlflow.run_start():を実行してあげる必要があります。この操作を行わないとくMLflowのUI上からアーティファクトを参照することができなくないため、エラーとなっててしまいます。
optunaの場合はベストなハイパラを使って再度学習させる必要があるので、最後にモデルを保存するようなコードを追加すればokです。
if __name__ == '__main__': data = sklearn.datasets.load_breast_cancer(return_X_y=False) train_x, test_x, train_y, test_y = train_test_split(data.data, data.target, test_size=0.25) dtrain = lgb.Dataset(train_x, label=train_y) # optunaで得られたベストなパラメータを使って学習 lgb_model = lgb.train(trial.params, dtrain) preds = lgb_model.predict(test_x) pred_labels = np.rint(preds) accuracy = sklearn.metrics.accuracy_score(test_y, pred_labels) # データのスキーマを登録する df = pd.DataFrame(data.data, columns=data.feature_names) signature = infer_signature(df, preds) with mlflow.start_run(experiment_id=experiment_id) as run: mlflow.log_params(trial.params) mlflow.log_metric('accuracy', accuracy) mlflow.lightgbm.log_model(lgb_model, artifact_path='lightgbm-model', signature=signature) name = 'LightGBM' tags = {'data': 'breast cancer'} try: client.get_registered_model(name) except: client.create_registered_model(name) run_id = run.info.run_id model_uri = "runs:/{}/lightgbm-model".format(run_id) mv = client.create_model_version(name, model_uri, run_id, tags=tags) print("model version {} created".format(mv.version))
このようにすることでモデルのバージョン管理を行うことができます。モデルの精度が悪化してきた場合は、このスクリプトを実行することで継続的にメンテナンスできます。
mlflowはモデルをデプロイする機能もあり、REST APIで予測結果を返すこともできます。仮想環境やDocker環境を自動で構築してくれます。
mlflow serve models -m <モデルのディレクトリ>
予測を行う場合はPOSTを行います。
import requests url = 'http://127.0.0.1:5000/invocations' headers = {'Content-Type': 'application/json'; 'format': 'pandas-split'} r = requests.post(url, df.to_json('split'), headers)
もしくは、単純に以下の形式でもよいです。
curl -X POST -H "Content-Type:application/json; format=pandas-split" --data "{\"columns\":[\"alcohol\", \"chlorides\", \"citric acid\", \"density\", \"fixed acidity\", \"free sulfur dioxide\", \"pH\", \"residual sugar\", \"sulphates\", \"total sulfur dioxide\", \"volatile acidity\"],\"data\":[[12.8, 0.029, 0.48, 0.98, 6.2, 29, 3.33, 1.2, 0.39, 75, 0.66]]}" http://127.0.0.1:1234/invocations
モデルの読み込みも簡単です。
loaded_model = mlflow.pyfunc.load_model(mlflow_pyfunc_model_path)
各MLflowが提供しているModelAPIを使ってもよいと思います。
MLflowを使うとモデル管理を本格的に行うことができます。Microsoft Azure MLやAmazon SageMaker等にデプロイができるようなので、次回はSageMaker (or Kubernetes)でデプロイをやってみようと思います。
実際の業務でモデル選定やハイパラ調整で時間を大きく割くのは無駄(記述子・特徴量生成の方が大事)なので、簡単なテーマならさくっとモデルを構築してすぐプロダクトを提供できるような形にしていきたいです。
機械学習を使ったプロジェクトに携わると、データ解析をしていくにつれて大量のモデルが構築され、これらのモデルを管理するだけでも大変です。IT企業のようなソフトウェアエンジニアリングの部署がある企業では、各社ベストプラクティスがあるのだと思いますが、私の場合、メーカー勤務かつ部署が研究開発ということもあり、ソフトウェアエンジニアリングのノウハウを学ぶことはできず、手探りで探索しているような状況です。となるとOSSを使うことになるのですが、機械学習のライフサイクル管理もかねてmlflowを本格的に使っていきたいと思います。
また、facebook researchが作成しているHydraと呼ばれるconfig管理フレームワークを導入します。これはargparseで書いていた部分をyamlで書くことで簡略化できるようになっています。今まではスクリプト上で煩雑だったものが、yamlで管理できるため非常に見やすくなります。
これら両方を用いて、ElasticNetのハイパーパラメータをグリッドサーチしていきたいと思います。
pip install hydra-core --upgrade
まずはconfig.yaml
に設定ファイルを記載していきます。
params: alpha: 0.5 l1_ratio: 0.5 file: path: 'sklearn_elasticnet_wine/winequality-red.csv'
次に、python側では以下のようにして読み込みます。
import hydra from omegaconf import DictConfig, OmegaConf @hydra.main(config_name='config') def main(cfg: DictConfig) -> None: print(OmegaConf.to_yaml(cfg)) if __name__ == "__main__": my_app()
コマンドライン引数を読み込みたい関数にhydraをデコレートしてあげます。実行すると以下のようになります。
$ my_app.py params: alpha: 0.5 l1_ratio: 0.5 file: path: 'sklearn_elasticnet_wine/winequality-red.csv'
オーバーライドをするのも簡単です。
$ my_app.py params.alpha=1.0 params.l1_ratio=1.0 params: alpha: 1.0 l1_ratio: 1.0 file: path: 'sklearn_elasticnet_wine/winequality-red.csv'
コマンドライン引数を変更した場合はoutputs
のディレクトリが自動で生成されログが出力されます。
.hydra ├── config.yaml ├── hydra.yaml └── overrides.yaml
また、複数の引数を実行することもできます。
$ my_app.py params.alpha=0.1,0.5,1.0 params.l1_ratio=0.1,0.5,1.0 [2021-03-06 15:49:32,780][HYDRA] Launching 9 jobs locally [2021-03-06 15:49:32,781][HYDRA] #0 : params.alpha=0.1 params.l1_ratio=0.1 [2021-03-06 15:49:32,979][HYDRA] #1 : params.alpha=0.1 params.l1_ratio=0.5 [2021-03-06 15:49:33,156][HYDRA] #2 : params.alpha=0.1 params.l1_ratio=1.0 [2021-03-06 15:49:33,351][HYDRA] #3 : params.alpha=0.5 params.l1_ratio=0.1 [2021-03-06 15:49:33,535][HYDRA] #4 : params.alpha=0.5 params.l1_ratio=0.5 [2021-03-06 15:49:33,722][HYDRA] #5 : params.alpha=0.5 params.l1_ratio=1.0 [2021-03-06 15:49:33,919][HYDRA] #6 : params.alpha=1.0 params.l1_ratio=0.1 [2021-03-06 15:49:34,095][HYDRA] #7 : params.alpha=1.0 params.l1_ratio=0.5 [2021-03-06 15:49:34,285][HYDRA] #8 : params.alpha=1.0 params.l1_ratio=1.0
このようにすることでグリッドサーチによるハイパーパラメータ探索のコードを簡単に記述することができます。
mlflowを用いれば各実行条件ごとのメトリック、モデル、ハイパーパラメータを適切にログを取ることができ、可視化をすることができます。今回はグリッドサーチですが、optunaのようなベイズ最適化を用いた探索の場合では、非常に簡単にログを取ることができます。
pip install mlflow
import os from random import random, randint import mlflow if __name__ == '__main__': with mlflow.start_run(run_id=0): mlflow.log_param("param1", randint(0, 100)) for i in range(3): mlflow.log_metric("foo", random() + float(i)) # Log an artifact (output file) if not os.path.exists("outputs"): os.makedirs("outputs") with open("outputs/test.txt", "w") as f: f.write("hello world!") mlflow.log_artifacts("outputs")
各計算結果をlog_metric
やlog_param
等で保存していきます。run_idは自動で生成されますが自分でつけることもできます。
ネットで調べてみると、グリッドサーチを試してみたという例は複数ありました。私もすごく単純な例で試していきます。
ベースはmlflow/examples/sklearn_elasticnet_wineのサンプルコードです。 https://github.com/mlflow/mlflow/tree/master/examples/sklearn_elasticnet_wine
import os import warnings import logging import pandas as pd import numpy as np from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score from sklearn.model_selection import train_test_split from sklearn.linear_model import ElasticNet import mlflow import hydra from omegaconf import DictConfig logging.basicConfig(level=logging.WARN) logger = logging.getLogger(__name__) EXPERIMENT_NAME = 'elasticnet_wine' def eval_metrics(actual, pred): rmse = np.sqrt(mean_squared_error(actual, pred)) mae = mean_absolute_error(actual, pred) r2 = r2_score(actual, pred) return {'rmse': rmse, 'mae': mae, 'r2': r2} def fetch_data(cfg: DictConfig): data = pd.read_csv(hydra.utils.to_absolute_path(cfg.file.path), sep=";") train, test = train_test_split(data) # The predicted column is "quality" which is a scalar from [3, 9] train_x = train.drop(["quality"], axis=1) test_x = test.drop(["quality"], axis=1) train_y = train[["quality"]] test_y = test[["quality"]] return train_x, test_x, train_y, test_y
まずは基本的なパッケージを読み込みます。
mlflowで使う実験名も最初に指定しています。データやモデルの条件が変わった場合は実験名を変えるのかなと思います。
メトリックを計算する関数と、データを読み込む関数を指定します。hydraの注意点ですが、実行時にパスが自動的に変更されてしまいますので、hydra.utils.to_absolute_path()
を使って実行時のパスを取得してあげる必要があります。
@hydra.main(config_name='config') def main(cfg: DictConfig): warnings.filterwarnings('ignore') np.random.seed(40) hydra_path = os.getcwd() train_x, test_x, train_y, test_y = fetch_data(cfg) alpha = float(cfg.params.alpha) l1_ratio = float(cfg.params.l1_ratio) lr = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, random_state=42) lr.fit(train_x, train_y) predicted_qualities = lr.predict(test_x) metrics = eval_metrics(test_y, predicted_qualities) # MLFLOW os.chdir(hydra.utils.get_original_cwd()) client = mlflow.tracking.MlflowClient() # データごとにExperimentを作成 for exp in client.list_experiments(): if EXPERIMENT_NAME == exp.name: experiment_id = exp.experiment_id break else: experiment_id = client.create_experiment(EXPERIMENT_NAME) run_id = client.create_run(experiment_id).info.run_id for k, v in cfg.params.items(): client.log_param(run_id, k, v) for k, v in metrics.items(): client.log_metric(run_id, k, v) hydra_files = ['.hydra/config.yaml', '.hydra/hydra.yaml', '.hydra/overrides.yaml'] for hydra_file in hydra_files: client.log_artifact(run_id, os.path.join(hydra_path, hydra_file)) with mlflow.start_run(run_id=run_id): mlflow.sklearn.log_model(lr, "model") client.set_terminated(run_id)
注意点は、先ほどと同様にhydraが自動でログを作成するのでパスが変わってしまうことです。
os.chdir(hydra.utils.get_original_cwd())
でmlflowが生成するmlrusのディレクトリに保存されるようにします。
また、実験名が一致するディレクトリから実験idを取得します。run_idは実験idから自動で作成されます。
そのあとは、各ハイパーパラメータの値や、メトリックスの値をclientのメソッド方に渡してあげるだけです。 ついでにhydraの出力ファイルもアーティファクトの方に保存できるようにしています。
下記コマンドでmlflowのUIが描画されます。内部ではreact.jsを用いています。
mlflow ui
以前にoptunaを試した例も載っていますが、計算結果がすべて保存されていることが分かります。
Scatter Plot, Contour Plot, Parallel Cordinates Plotも見れるようになっています。非常に便利です。
mlflowとhydraを使ってハイパーパラメータ探索を行いモデル管理まで行いました。効率よく機械学習モデルを作成して、デプロイまで考えるとなるとこういった技術は早く使えこなせるようになっていきたいと思います。
保存先もfileではなくてデータベースに保存したり、分散学習した場合のハイパーパラメータ管理など、こちらも手を付けていきたいと思います。