グラフ機械学習と強化学習について

主にグラフ機械学習や強化学習手法を記載します。

MDP: Value Iteration

NeurIPS 2021のOutstanding papersの1つであるOn the Expressivity of Markov Rewardを理解したいと思いつつ、実験を通じて色々理解を深めようとしてきました(全然できていません)。 論文の内容としては、マルコフ決定過程(MDP)において、3通りのタスクの形で表現でき、そのとき

  • いかなる報酬関数によっても実現できないタスクがある

  • 仮に報酬関数が存在するならば、多項式時間で報酬設計問題を解くことができる

ということを示しています。マルコフ報酬の理論体系の提案と、上記の証明、実際のタスクにおいて本手法を使った報酬設計を行うと既存のQ-learningよりも早く収束することを示しています。

論文としては非常に洗練されており、自分もこのような形で書けたら良いなと思う次第です。

[論文] arxiv.org

[ブログ] deepmind.com

タスクとは?

MDPの仮定の下で、3つのタスクを1. Set of Acceptable Polices (SOAP), 2. Policy Ordering (PO), 3. Trajectory Ordering (TO)として表しています。基本的にマルコフ報酬の下ではこの形で表現できます。

f:id:udnp:20220220220739p:plain

引用元:DeepMindブログ

この3つのタスクの下で、「報酬」によってタスクを表現できるか研究しています。

Set of Acceptable Polices

例) 10ステップ以内にゴールに到達しよう

$$ V^{\pi_g} (s_0) \gt V^{\pi_b} (s_0) $$

初期状態の価値が大きいものが良くなります(g=good, b=bad)。方策の集合で状態価値の初期値が最も良いのを選択する場合がSOAPに対応しています。MDPでよく使われる下図でみてみます。何度か行動を行って得られた方策を使い、$V(s_0)$の値が最も大きいものを使用します。

f:id:udnp:20220220231417p:plain

たとえば、ある方策(ランダム)、時間割引率0.99、スリップなしで行動が行われた時の状態価値は以下の通りです。

1 2 3 4
1 0.9801 0.99 1.0 goal
2 0.9702 wall 0.99 fire
3 0.9605 0.9702 0.9801 0.9702

5は壁、7は火(報酬-1)、11はゴール(報酬+1)です。この通りに行動を行うならば$s_0 \rightarrow s_1 \rightarrow s_2 \rightarrow s_6 \rightarrow s_{10}$ もしくは $s_0 \rightarrow s_4 \rightarrow s_8 \rightarrow s_9 \rightarrow s_{10}$となります。

Policy Ordering

例) 5ステップでゴールに到達するのが望ましい、もしくは10ステップ以内、もしくは気にしない。

$$ V^{\pi_1} (s_0) \gt V^{\pi_2} (s_0) \gt \ldots $$

SOAPでは良い方策か悪い方策かの集合を考えていましたが、POではその中でも順序を考えます。

Trajectory Ordering

例) 安全にゴールに到達して危ない所は避けるのが望ましい。

$$ G(τ_1; s_0) \gt G(τ_2; s_0) \ldots $$

累積報酬が最大なものの方が良い。

Value Iteration

さて、上図の環境で強化学習を行います。ベルマン方程式を考えると、累積報酬の期待値が最大となる価値の推定はDynamic Programmingを行えば得られます。

最適ベルマン方程式の状態価値関数の更新式

$$ V(s) = \sum P(s'|s, \pi(s)) (R(s, \pi(s), s') + \gamma V(s')) $$

更新ルールは状態$s$において行動$a \sim \pi(\cdot| s)$を選択したときの報酬と、$s'$の状態価値の値を使って更新していきます。 状態遷移が複数ある場合では、各遷移確率分の値の総和を取ります。

 s_0から行動 a_0 に対して遷移 s_1, s_2が観測された場合、

$$ V(s_0) = \frac{s_1}{s_1 + s_2 } (R(s_0, a_0, s_1) + \gamma V(s_1)) + \frac{s_2}{s_1 + s_2 } (R(s_0, a_0, s_2) + \gamma V(s_2)) $$

となります。状態遷移確率は基本的に不明なため、ここでは到達回数の比率で計算します。 更新はバックワードに行われます。これは状態価値・行動価値関数の推定どちらでも同じものになります。

Temporal difference learning

Value iterationでは、最後まで行動を行い価値関数を更新します。 一方、Temporal difference (TD) learningに代表されるQ-learningでは、行動を最後まで行わなくても価値関数を更新できる方法です。

TDのターゲットは以下の式になります。 $$ \text{TD}_\text{target} = R(s_t, \pi(s_t)) + \gamma V(s_{t+1}) $$

Q-learningの状態価値の推定は、行動価値関数の最大値を使います。

$$ V(s_{t+1}) = \max_{a} Q(s_{t+1}, a) $$

SARSAでは、方策(状態価値)を使って行動し、その時の状態価値を使います。

$$ V(s_{t+1}) = Q(s_{t+1}, \pi(s_t)) $$

このように価値の推定の仕方にはバリエーションがあります。そしてベルマン方程式を解くためにTD targetと、現在の価値との差、TD誤差を使って更新を行っていきます。

$$ Q(s_t, a_t): =Q(s_t, a_t) + \alpha (\text{TD}_\text{target}- Q(s_t, a_t)) $$

αは学習率で、大きいほどTD誤差の影響が大きくなります。 TD誤差が大きい場合は、その行動はより価値が高い方更新されます。TD(0) learningとも呼ばれます。

サンプルコード

Grid world (FrozenLake-v0)でValue iterationとQ-learningを試した例は以下になります。

github.com

V learning

value iterationでは報酬テーブル、価値テーブル、状態遷移が必要になるため、これらを辞書形式で保存するようにします。

import gym
from collections import Counter, defaultdict

class Agent:
    def __init__(self, env):
        self.env = env
        self.transitions = defaultdict(Counter)  # (state, action)
        self.reward_table = defaultdict(float)   # (state, action, next_state)
        self.value_table = defaultdict(float)    # (state, (action))

    def set_state_action(self, state, action, reward, next_state):
        self.reward_table[state, action, next_state] = reward
        self.transitions[state, action][next_state] += 1

状態遷移確率は上述したようにパスを通った回数の比率で算出するためCounterで求めていきます。1ステップごとに(state, action, reward, next_state)を記録するようにします。

状態価値の推定は$V(s) = \sum P (s' |s, a) ( R(s, a, s') + \gamma V(s))$にて行います。その式が以下の通りです。ある状態から次の状態に変化したときの価値を求め更新してきます。

def compute_state_value(self, state, action):
        target_counts = self.transitions[state, action]
        total = sum(target_counts.values())
        state_value = 0.0
        for target_state, count in target_counts.items():
            reward = self.reward_table[state, action, target_state]
            state_value += (count / total) * (
                  reward + self.gamma * self.value_table[target_state]
         )
        return state_value

最終的に得られた値をもとに状態価値を更新します。 観測された全ての状態に対して価値を求め、その中で最大のものが状態価値となり、これを繰り返していくのがvalue iterationです。 行動を行うたびに前の状態価値が伝播され最初の状態価値が求まっていきます。

def value_iteration(self):
        for state in range(self.env.observation_space.n):
            state_values = [
                self.compute_state_value(state, action)
                for action in range(self.env.action_space.n)
            ]
            self.value_table[state] = max(state_values)

Q-learning

Qに対してもvalue iterationを使うことができます。違いは最大となるQの値を使って行動を行っていくため、行動に対する各価値も推定していきます。

TD learningでのQ-learningでは状態遷移が必要となりません。そのため必要なのは価値テーブルだけです。モデルフリーの学習になります。

class Qlearning2:
    def __init__(self, env, gamma=0.99, alpha=0.9):
        self.env = env
        self.gamma = gamma
        self.alpha = alpha
        self.state = self.env.reset()
        self.value_table = defaultdict(float)

    def sample(self) -> Tuple[np.ndarray, int, float, np.ndarray]:
        action = self.env.action_space.sample()
        cur_state = self.state
        new_state, reward, done, _ = self.env.step(action)
        self.state = self.env.reset() if done else new_state
        return cur_state, action, reward, new_state

    def update(self, state, action, reward, next_state):
        max_q, _ = self.best_value_action(next_state)
        td_target = reward + self.gamma * max_q
        cur_q = self.value_table[state, action]
        self.value_table[state, action] = cur_q + self.alpha * (td_target - cur_q)

    def best_value_action(self, state):
        best_action, best_value = None, None
        for action in range(self.env.action_space.n):
            action_value = self.value_table[state, action]
            if best_value is None or best_value < action_value:
                best_value = action_value
                best_action = action
        return best_value, best_action

    def run_episode(self, env):
        total_reward = 0.0
        state = env.reset()
        while True:
            _, action = self.best_value_action(state)
            new_state, reward, done, _ = env.step(action)
            total_reward += reward
            if done:
                break
            state = new_state
        return total_reward

sample()では、ランダムの行動を通して(state, action, reward, next_state)のペアを集めていきます。update()ではQ-learningの更新ルールに従って価値テーブルを更新します。best_value_action()では価値テーブルの中から、とある状態のとき最大の価値とその行動を返します。CartPoleでもそうですが、報酬設計をうまくしないと価値テーブルの更新がうまくいきません。

結果

main.pyに実験コードを記載しています。

import gym

from grid_world.agent import Qlearning, Vlearning
from grid_world.env import CustomMaze

if __name__ == "__main__":
    env = gym.make("FrozenLake-v1")
    # env = CustomMaze()

    test_env = gym.make("FrozenLake-v1")
    agent = Vlearning(env, gamma=0.99, epsilon=0.0)
    # agent = Qlearning(env, gamma=0.99, epsilon=0.0)
    iter_n = 0
    best_reward = 0.0
    test_episode = 20
    while True:
        iter_n += 1
        agent.play_random_steps(100)
        agent.value_iteration()

        reward = 0.0
        for _ in range(test_episode):
            reward += agent.play_episode(test_env)
        reward /= test_episode
        if reward > best_reward:
            print(f"Best reward {best_reward:.3f} -> {reward:.3f}")
            best_reward = reward
        if reward > 0.8:
            print(f"Solved: {iter_n} iterations. Best reward: {best_reward}.")
            break
Best reward 0.000 -> 0.500
Best reward 0.500 -> 0.650
Best reward 0.650 -> 0.700
Best reward 0.700 -> 0.750
Best reward 0.750 -> 0.800
Best reward 0.800 -> 0.950
Solved: 8 iterations. Best reward: 0.95.

最後に

改めて実装するとvalue iterationやQ-learningで忘れていたところを思い出せたような気がします。次回(できたら)は、方策勾配法(PG)、信頼領域を使った方策最適化(TRPO)、近接方策最適化(PPO)などを実装して理解を深めていきたいと思います。 PG, TRPO, PPOは方策にニューラルネットを使うためRllibの勉強もかねて、サンプルコードをなぞりながら実装していきたいです。