Python ベイズ最適化によるハイパーパラメータの調整「Bayesian Optimization×TensorFlow」

 本記事では、テンソルフローで作成するニューラルネットワーク回帰分析のハイパーパラメータを調整する雛形コードを載せました。Pythonベイズ最適化ライブラリはいくつかある中で、本記事では「Bayesian Optimization」を用いました。GitHub - fmfn/BayesianOptimization: A Python implementation of global optimization with gaussian processes.

インストールは次のようにcondaで出来ます。

conda install -c conda-forge bayesian-optimization

本コードの特徴を次の通りです。隠れ層の数を予め[3,5,10]などとリストで指定しておくことで、各々の場合のハイパーパラメータ(Hyper Parameter)を求めます。ハイパーパラメータは、ニューロン数、学習率、ドロップ率の3つとしています。最適化する損失関数(loss)は、平均二乗誤差(Mean Squared Error, MSE)としました。別の指標にしたければ指定可能です。
下図例は、本コード「zzz_bayesian.py」を実行して作成された生成物です。

f:id:HK29:20200106174532p:plain

下図のように、最適化過程の履歴をcsvファイル「*_result.csv」に保存します。

f:id:HK29:20200106174634p:plain

下図のように、最適化結果をcsvファイル「*_result_max.csv」に保存します。

f:id:HK29:20200106174703p:plain

下図はベイズ最適化の過程をグラフ化したものです。縦軸は評価指標MSE(平均2乗誤差)で小さい程良いです。隠れ層の数を増やせば当てはまりが良くなるとは必ずしも言えない結果です。

f:id:HK29:20200106182947p:plain

 対策案として、調整するハイパーパラメータの項目を増やしたり、探索範囲を広めて行うこともあるが、解析時間が増えるデメリットがあります。他の案としては、データ前処理として標準化した方が良いとされます。理由は、隠れ層間の重み(感度)を同レベルにするためです。

 NN(Neural Network)の場合、パラメータが増えると爆発的に探索範囲が増えるため非常に時間を要することになります。今回の計算で15分前後掛かかりました。参考までに、同データを回帰分性する他の手法として、ランダムフォレストやLightGBM等の決定木系の非線形回帰分析では数十秒で分析が完了します。

■本プログラム
本コードを実行すると、一旦、隠れ層別に「pbounds_module_3layers.py」などのファイル名でベイズ最適化のコードをモジュールファイルで出力します。そして、それを実行する仕様です。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from bayes_opt import BayesianOptimization
import os, sys
import importlib

import pandas as pd
import numpy as np
import tensorflow as tf

import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt

from sklearn.datasets import load_boston
boston = load_boston()
# Xデータを抽出(Pandasのデータフレーム形式)
X_df = pd.DataFrame(boston.data, columns=boston.feature_names)
# Yデータを抽出(Pandasのシリーズ形式)
Y_s = pd.Series(boston.target)

# ベイズ最適化実行用クラスをモジュール「pbounds_module.py」で作成する関数
def create_build_model_module(n_layers):

    ### ハイパーパラメータを辞書で作成する
    # はじめにkeyをリストで作成
    key_list = ["neuron_{:02d}".format(i+1) for i in range(n_layers)]
    key_list.append("learning_rate") # リストに要素を追加。appendメソッド
    key_list.append("drop_rate")
    # 次にvalueをリストで作成
    value_list = [str(neuron_range) for i in range(n_layers)]
    value_list.append(learning_rate_range)
    value_list.append(drop_rate_range)
    # 最後にハイパーパラメータを辞書で作成
    pbounds_dic = dict(zip(key_list, value_list))
    print(pbounds_dic)

    # 最適化する損失関数とベイズ最適化関数をモジュール.pyファイルで作成
    with open('pbounds_module_' + str(n_layers) + 'layers.py', 'w') as f:
        f.write('# this is pbounds_module for BayesianOptimization of NN\n')
        f.write('# Neural_Network:' + activation_function + '\n')
        f.write('# n_layers=' + str(n_layers) + '\n\n')
        f.write('import pandas as pd\n')
        f.write('import numpy as np\n')
        f.write('from tensorflow.keras.models import Sequential\n')
        f.write('from tensorflow.keras.layers import Activation, Dropout, Flatten, Dense\n')
        f.write('from tensorflow.keras.utils import plot_model\n')
        f.write('from bayes_opt import BayesianOptimization\n')
        f.write('from sklearn.model_selection import train_test_split\n\n')
        f.write('class NNmodel_for_BayesOpt:\n')
        f.write('    def __init__(self, X_df, Y_s, n_features, my_loss, my_metrics, opt_name, opt_setting, n_layers):\n')
        f.write('        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(X_df, Y_s, test_size=0.3)\n')
        f.write('        self.n_features = n_features\n')
        f.write('        self.my_loss = my_loss\n')
        f.write('        self.my_metrics = my_metrics\n')
        f.write('        self.opt_name = opt_name\n')
        f.write('        self.opt_setting = opt_setting\n\n')

        f.write('    def build_model(self,' + '\n')
        for i, key in enumerate(pbounds_dic.keys()):
            if i < (len(pbounds_dic)-1):
                f.write(r"                    " + key + ',\n')
            else:
                f.write(r"                    " + key + '\n')
        f.write('                    ):' + '\n')

        f.write('        model = Sequential()' + '\n')
        for i in range(n_layers):
            key = "neuron_{:02d}".format(i+1)
            print(key)
            if (i+1) == 1:
                f.write('        model.add(Dense(units=int(' + key + "), activation=" + activation_function + ', input_dim=self.n_features))' + '\n')
            elif (i+1) % 3 == 0:
                f.write('        model.add(Dense(units=int(' + key + "), activation=" + activation_function + "))" + '\n')
                f.write('        model.add(Dropout(rate=drop_rate))' + '\n')
            else:
                f.write('        model.add(Dense(units=int(' + key + "), activation=" + activation_function + "))" + '\n')
        f.write('        model.add(Dense(1))' + '\n')
        buf_list = [
            '        model.compile(loss=self.my_loss, optimizer=self.opt_setting, metrics=self.my_metrics)',
            r'        plot_model(model, to_file = self.opt_name + "_' + str(n_layers) + 'layers_model.jpg", show_shapes=True)',
            '',
            '        print("#### fit")',
            '        model.fit(self.X_train, self.y_train, epochs=' + str(epochs) + ', batch_size=' + str(batch_size) + ', verbose=0)',
            '',
            '        print("### evalute")',
            '        out_loss_and_metrics = model.evaluate(self.X_test, self.y_test, batch_size = ' + str(batch_size) + ', verbose=1)',
            '',
            '        return (-1)*out_loss_and_metrics[0]'
        ]
        buf_list_str = '\n'.join(buf_list)
        f.write(buf_list_str + '\n\n')

        f.write('    def bayesOpt(self):' + '\n')
        f.write('        pbounds = {' + '\n')
        i = 1
        for key, value in pbounds_dic.items():
            if i < len(pbounds_dic):
                f.write(r"            '" + key + "':" + str(value) + ',\n')
            else:
                f.write(r"            '" + key + "':" + str(value) + '\n')
            i += 1
        f.write('        }' + '\n\n')
        f.write('        optimizer = BayesianOptimization(f = self.build_model, pbounds = pbounds)' + '\n')
        f.write('        optimizer.maximize(init_points=' + str(initial_points) + ', n_iter=' + str(n_iteration) \
                + ', acq=' + acquisition_function + ')' + '\n\n')
        f.write('        return optimizer' + '\n')

# 分析結果をcsvファイルで出力する関数
def to_csv(opt_name, activation_function, my_loss, n_layers, result):
    print("n_layers -> " + str(n_layers))
    # 履歴をcsvファイルで保存
    print("--- result.res ---")
    print(result.res)
    for i, my_result in enumerate(result.res):
        value_list = [opt_name]
        out_socre = (-1)*my_result.get('target')
        value_list.append(str(out_socre))
        result_params_dict = my_result.get('params')
        if i==0:
            key_list = ['opt_name']
            key_list.append(my_loss) # my_lossは'target'のこと
            for my_key, my_value in result_params_dict.items():
                key_list.append(my_key)
                if my_key.endswith('_rate'):
                    # 語尾が_rateの場合、文字列型に変換してからリスト入れる
                    value_list.append(str(my_value))
                else:
                    # 語尾が_rateでない場合、一旦整数にしてから文字列型に変換
                    value_list.append(str(int(my_value)))
            key_str = ','.join(key_list)
            value_str = ','.join(value_list)
            with open(opt_name + '_' + activation_function + '_' + str(n_layers) + 'layers_result.csv', 'w') as f:
                f.write(key_str + '\n')
                f.write(value_str + '\n')
        else:
            for my_key, my_value in result_params_dict.items():
                if my_key.endswith('_rate'):
                    value_list.append(str(my_value))
                else:
                    value_list.append(str(int(my_value)))
            value_str = ','.join(value_list)
            with open(opt_name + '_' + activation_function + '_' + str(n_layers) + 'layers_result.csv', 'a') as f:
                f.write(value_str + '\n')

    # 最適結果をcsvファイルで出力
    print("--- result.max ---")
    print(result.max)
    key_list = ['opt_name']
    value_list = [opt_name]
    key_list.append('target')
    out_socre = (-1)*result.max.get('target')
    value_list.append(str(out_socre))
    result_params_dict = result.max.get('params')
    print(result_params_dict)
    for my_key, my_value in result_params_dict.items():
        key_list.append(my_key)
        if my_key.endswith('_rate'):
            value_list.append(str(my_value))
        else:
            value_list.append(str(int(my_value)))
    key_str = ','.join(key_list)
    value_str = ','.join(value_list)
    with open(opt_name + '_' + activation_function + '_' + str(n_layers) + 'layers_result_max.csv', 'w') as f:
        f.write(key_str + '\n')
        f.write(value_str + '\n')

# メイン関数。隠れ層別、更に指定したオプティマイザ別に順次処理してゆく。
def main():
    for n_layers in n_layers_list:
        # ベイズ最適化実行用「pbounds_module.py」ファイル作成
        create_build_model_module(n_layers)
        # 作成したモジュールをインポートする
        # 動的にモジュール名が変更する場合、importlib.import_moduleでモジュールを文字列で指定できる。
        my_pbounds = importlib.import_module('pbounds_module_' + str(n_layers) + 'layers')
        my_pbounds = importlib.reload(my_pbounds)
        print(my_pbounds)
        i = 1
        # オプティマイザー毎に処理する
        for opt_name, opt_setting in all_optimizer.items():
            print(i)
            print(opt_name)
            print(opt_setting)
            # 自作モジュール「my_pbounds」のNNmodel_for_BayesOptクラスからインスタンス「model」の生成
            model = my_pbounds.NNmodel_for_BayesOpt(X_df, Y_s, n_features, my_loss, my_metrics, opt_name, opt_setting, n_layers)
            result = model.bayesOpt() # ベイズ最適化を実行して結果を変数「result」へ格納
            to_csv(opt_name, activation_function, my_loss, n_layers, result) # csvファイルへ出力する関数
            i += 1

# 以下で、パラメータ設定をする
if __name__ == "__main__":
    ### parameter「普通のパラメータ」
    n_features = 13
    my_loss = 'mean_squared_error' # 最適化する損失関数を指定 model.compile(loss= で定義
    my_metrics = ['mean_squared_error', 'mean_absolute_error'] #,
    #                       'mean_absolute_percentage_error', 'mean_squared_logarithmic_error',
    #                       'squared_hinge', 'hinge', 'categorical_hinge', 'logcosh']

    # 隠れ層はリストで指定する。それぞれに対するハイパーパラメータを求める
    n_layers_list = [3, 4, 5] # この場合、隠れ層3、5、10の場合を順次試行する

    ### Hyper parameter「ハイパーパラメータ」
    # (下限, 上限)で設定する。float型の連続値として選択される。
    neuron_range = (10, 1000) # 隠れ層のニューロン数。層別に異なるニューロン数を本コード中でint型に変換している
    learning_rate_range = (0.001, 1) # 学習率
    drop_rate_range = (0.2, 0.5) # ドロップ率

    # 以下は予め指定する
    activation_function = "'relu'" # 活性化関数:非線形候補例→ relu tanh sigmoid
    epochs = 100 # 1世代試行する時の学習回数
    batch_size = 20 # (1世代中の)1回学習時に処理する学習データのひと塊の数
    # 1epochでNNの重みを更新する回数 = epochs / batch_size

    # 「Bayesian Optimization」のパラメータ
    initial_points = 5 # ベイズで初回に実行するランダムポイントの数
    acquisition_function = "'ucb'" # ベイズの獲得関数:候補例→ ei poi
    n_iteration = 25 # ベイズで試行する回数

    ### オプティマイザ
    learning_rate = learning_rate_range[0] # 学習率の定義で、一旦ダミー変数を入れとく
    all_optimizer = {
        #'SGD': tf.keras.optimizers.SGD(lr=learning_rate, momentum=0.0, decay=0.0, nesterov=False),
        'RMSprop': tf.keras.optimizers.RMSprop(lr=learning_rate, rho=0.9, epsilon=None, decay=0.0),
        #'Adagrad': tf.keras.optimizers.Adagrad(lr=learning_rate, epsilon=None, decay=0.0),
        #'Adadelta': tf.keras.optimizers.Adadelta(lr=learning_rate, rho=0.95, epsilon=None, decay=0.0),
        #'Adam': tf.keras.optimizers.Adam(lr=learning_rate, beta_1=0.9, beta_2=0.999, epsilon=None, decay=0.0, amsgrad=False),
        #'Adamax': tf.keras.optimizers.Adamax(lr=learning_rate, beta_1=0.9, beta_2=0.999, epsilon=None, decay=0.0),
        #'Nadam': tf.keras.optimizers.Nadam(lr=learning_rate, beta_1=0.9, beta_2=0.999, epsilon=None, schedule_decay=0.004),
    }

    ### call function
    main()

    print('finished')

下記コードは、上記の本コード実行により作成されたベイズ最適化するモジュールの例である。

# this is pbounds_module for BayesianOptimization of NN
# Neural_Network:'relu'
# n_layers=3

import pandas as pd
import numpy as np
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Activation, Dropout, Flatten, Dense
from tensorflow.keras.utils import plot_model
from bayes_opt import BayesianOptimization
from sklearn.model_selection import train_test_split

class NNmodel_for_BayesOpt:
    def __init__(self, X_df, Y_s, n_features, my_loss, my_metrics, opt_name, opt_setting, n_layers):
        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(X_df, Y_s, test_size=0.3)
        self.n_features = n_features
        self.my_loss = my_loss
        self.my_metrics = my_metrics
        self.opt_name = opt_name
        self.opt_setting = opt_setting

    def build_model(self,
                    neuron_01,
                    neuron_02,
                    neuron_03,
                    learning_rate,
                    drop_rate
                    ):
        model = Sequential()
        model.add(Dense(units=int(neuron_01), activation='relu', input_dim=self.n_features))
        model.add(Dense(units=int(neuron_02), activation='relu'))
        model.add(Dense(units=int(neuron_03), activation='relu'))
        model.add(Dropout(rate=drop_rate))
        model.add(Dense(1))
        model.compile(loss=self.my_loss, optimizer=self.opt_setting, metrics=self.my_metrics)
        plot_model(model, to_file = self.opt_name + "_3layers_model.jpg", show_shapes=True)

        print("#### fit")
        model.fit(self.X_train, self.y_train, epochs=100, batch_size=20, verbose=0)

        print("### evalute")
        out_loss_and_metrics = model.evaluate(self.X_test, self.y_test, batch_size = 20, verbose=1)

        return (-1)*out_loss_and_metrics[0]

    def bayesOpt(self):
        pbounds = {
            'neuron_01':(10, 1000),
            'neuron_02':(10, 1000),
            'neuron_03':(10, 1000),
            'learning_rate':(0.001, 1),
            'drop_rate':(0.2, 0.5)
        }

        optimizer = BayesianOptimization(f = self.build_model, pbounds = pbounds)
        optimizer.maximize(init_points=5, n_iter=25, acq='ucb')

        return optimizer

 ●関連資料

下記リンクは、scikit-learnによるNN回帰分析でライブラリは「optuna」を使用した例です。この例では、隠れ層の数や活性化関数もハイパーパラメータとしています。TensorFlowとoptunaの組み合わせが使いやすい。

hk29.hatenablog.jp

以上

<広告>