Python 回帰モデルの最小値or最大値をベイズ最適化により探索する「PyCaret×GPyOpt」

 本記事では、回帰モデル(目的関数)の最小値もしくは最大値をベイズ最適化のライブラリ「GPyOpt」で探索する雛形コードを載せました。最適結果に加えて、最適解の探索過程の目的変数と説明変数の履歴もcsvファイルに保存する仕様です。

 はじめに、回帰モデルを作成するには機械学習フレームワーク「PyCaret」を用います。その雛形コードは次のリンク先です。Python 「PyCaret」ベストな回帰モデルを自動選定するautoml() - PythonとVBAで世の中を便利にする ここで作成されたベスト回帰モデルはオブジェクトファイルであり、例えば「best_model_201121.pkl」といったファイル名で保存されます。

 本コードでは上記で作成した回帰モデルのオブジェクトファイルを読み込み、各説明変数を探索する上下限の範囲を設定します。更に、必要であれば制約条件を設定します。制約条件とは、例えば、x0 + x1 <=1 などといった説明変数間の従属性です。分析するデータによりはしますが、自然界において説明変数間が互いに独立であることはまれです。制約条件を設定しない場合、実際には各々の説明変数間で同時には取りえない値を探索し、その結果が最適解となり得ることを頭に入れておく必要があります。

本コードを実行すると、次のような5つのファイルを生成します。

f:id:HK29:20201122234548p:plain

「00_optimisation_data.csv」の中身は下図です。最適化の結果をcsvファイルに保存します。回帰モデル、つまり目的関数の最小値もしくは最大値の最適化結果が「Label」です。それより左の列はその時の各説明変数の値です。

f:id:HK29:20201122234657p:plain

「01_history_197.png」は下図です。探索履歴を図示しています。横軸と縦軸は説明変数代表2つで、プログラム中で指定する必要があります。ここで、本例題ではボストン住宅価格のデータセットを用いています。回帰モデル(目的関数)は住宅価格で安い条件を探索しました。つまり、最小値の最適化です。この例の場合、横軸「RM」は部屋数で縦軸「LSTAT」は低所得者の割合です。左上に点が多くプロットされているのは、そこを多く探索したためです。住宅価格が安い条件は、部屋数が少なく、低所得者の割合が多い地域ということで、理屈的に妥当な結果と解釈できます。

f:id:HK29:20201122234915p:plain

「02_history_data.csv」の中身は下図です。最適化の探索履歴をcsvファイルに保存します。この例の場合、A列からM列の13列が説明変数で、N列の「Label」が目的変数です。

f:id:HK29:20201122235527p:plain

■本プログラム
Anaconda PromptのようなCUI上で実行できます。

#!/usr/bin/env python
# coding: utf-8

# In[1]:

import pandas as pd
from pycaret.regression import *
import numpy as np 
import GPy
import GPyOpt

# 保存した回帰モデルを読み込む
load_reg_model = load_model('best_model_201121') #'best_model_' + now)
# 説明変数名のリスト
columns_name_list = [
    'CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX',
    'PTRATIO', 'B', 'LSTAT'
]


# 目的関数
def f(x):
    crim = x[:,0]
    zn = x[:,1]
    indus = x[:,2]
    chas = x[:,3]
    nox = x[:,4]
    rm = x[:,5]
    age = x[:,6]
    dis = x[:,7]
    rad = x[:,8]
    tax = x[:,9]
    ptratio = x[:,10]
    b = x[:,11]
    lstat = x[:,12]
    
    # データをリストに格納
    data_list = [
        crim, zn, indus, chas, nox, rm, age, dis, rad, tax,
        ptratio, b, lstat
    ]
    # データフレームを作成
    unseen_df = pd.DataFrame([data_list], columns=columns_name_list)
    # PyCaretで予測する
    predict_df = predict_model(load_reg_model, data=unseen_df)
    # 予測結果をfloat型で返す
    predict_val = predict_df['Label'].values.tolist()[0]
    
    return predict_val
    

# 説明変数xの上下限の設定
bounds = [
{'name': 'crim', 'type': 'continuous', 'domain': (0, 89)}, # 連続値で指定したい場合は、typeをcontinuousとして数値は(下限, 上限)で設定する
{'name': 'zn', 'type': 'continuous', 'domain': (0, 100)},
{'name': 'indus', 'type': 'continuous', 'domain': (0.45, 28)},
{'name': 'chas', 'type': 'discrete', 'domain': (0, 1)}, # カテゴリ変数。非連続値で指定したい場合は、typeをdiscreteとして数値を直打ちで設定する
{'name': 'nox', 'type': 'continuous', 'domain': (0.38, 0.88)},
{'name': 'rm', 'type': 'continuous', 'domain': (3.5, 8.8)},
{'name': 'age', 'type': 'continuous', 'domain': (2.9, 100)},
{'name': 'dis', 'type': 'continuous', 'domain': (1.1, 12)},
{'name': 'rad', 'type': 'discrete', 'domain': (1,2,3,4,5,6,7,8,24)},
{'name': 'tax', 'type': 'continuous', 'domain': (187, 711)},
{'name': 'ptratio', 'type': 'continuous', 'domain': (12, 22)},
{'name': 'b', 'type': 'continuous', 'domain': (0.3, 396)},
{'name': 'lstat', 'type': 'continuous', 'domain': (1.7, 38)},
]

''' 
# 制約条件を設定したい場合
constraints = [
{
"name": "constr_1",
"constraint": "(x[:,0] + x[:,1]) - 10 - 0.001" # crim + zn -10 <= 0.001 の制約設定したい場合
},
{
"name": "constr_2",
"constraint": "10 - (x[:,0] + x[:,1]) - 0.001" # crim + zn -10 >= -0.001 の制約設定したい場合
}
]
'''
flag_maximize = False # 目的関数を最大化したい場合にTrue。最小化したい場合はFalse
num = 200 # 探索回数
myBopt = GPyOpt.methods.BayesianOptimization(
    f=f,
    domain=bounds,
    #constraints = constraints, # 制約条件を設定したい場合
    maximize = flag_maximize, # when True -f maximization of f is done by minimizing -f (default, False).
    model_type = 'GP' # default 'GP'
)
# 最適化を実行する
myBopt.run_optimization(max_iter = num)
# 最適値(最小値or最大値)の抽出
if flag_maximize:
    opt_fx = myBopt.fx_opt * (-1)
else:
    opt_fx = myBopt.fx_opt
# 最適値(最小値or最大値)の時の説明変数xの値の抽出
opt_x = myBopt.x_opt
print(opt_fx)
print(opt_x)


# In[2]:


# 最適化結果をcsvファイルに保存する
columns_name_list_str = ','.join(columns_name_list)
columns_name_list_str = columns_name_list_str + ',Label' + '\n'
with open('00_optimisation_data.csv', 'w') as f:
    f.write(columns_name_list_str)
    buf_list = []
    for X in opt_x.tolist():
        buf_list.append(X)
    buf_list.append(opt_fx)
    opt_list = list(map(str, buf_list))
    opt_str = ','.join(opt_list)
    f.write(opt_str + '\n')


# In[3]:


# 説明変数xの探索履歴をグラフで可視化する
import matplotlib.pyplot as plt
import japanize_matplotlib

step = int(num/3)
my_x = 5 # x軸にする要素番号
my_y = 12 # y軸にする要素番号
my_x_lim = (0, 10) # x軸の範囲
my_y_lim = (0, 40) # y軸の範囲
my_x_name = 'RM'
my_y_name = 'LSTAT'
for i in range(step-1, num, step):
    history_X = myBopt.X[0:i]
    plt.rcParams['font.size'] = 13
    plt.figure()
    plt.title('開始から' + str(i) + '番目までの探索履歴')
    plt.xlabel(my_x_name)
    plt.ylabel(my_y_name)
    plt.xlim(my_x_lim[0], my_x_lim[1])
    plt.ylim(my_y_lim[0], my_y_lim[1])
    sc = plt.scatter(
        history_X[:,my_x], # X
        history_X[:,my_y], # Y
        s = 100,
        c = range(len(history_X)),
        cmap = "Greys")
    plt.colorbar(sc)
    plt.plot(history_X[:,my_x], history_X[:,my_y], linestyle=":", color='k')
    plt.grid()
    #plt.show()
    plt.savefig('01_history_' + str(i) + '.png')
    plt.close()


# In[4]:


# 探索履歴をcsvファイルに保存する
if flag_maximize:
    history_Y = myBopt.Y[0:num] * (-1)
else:
    history_Y = myBopt.Y[0:num]
history_Y_list = history_Y.tolist()
history_X_list = history_X.tolist()
with open('02_history_data.csv', 'w') as f:
    f.write(columns_name_list_str)
    for X, Y in zip(history_X_list, history_Y_list):
        X.extend(Y)
        buf_list = list(map(str, X))
        buf_str = ','.join(buf_list)
        f.write(buf_str + '\n')


# In[ ]:

以上

<広告>