Python テンソルフローで保存した回帰モデルの読み出しと実行

 本記事では回帰モデル「拡張子.h5」を読み出して、予測に使用する方法について記載する。本プログラムをコピーして例えば、「zzz_load_NNmodel.py」のような適当な名前で保存して実行するだけ。

 実行後は下図のように入力因子(設計変数)Xデータと回帰モデルを実行して得た出力指標(目的関数)Yデータの予測結果を右端の列に_predYで、csvファイルで出力する仕様である。

f:id:HK29:20191117134717p:plain

■本プログラム

特記事項

①回帰モデルの読み込みから実行までの流れは下記1~4でシンプルです

 1. model.compile()でモデルを作成

 2. model.load_weights(load_model)で拡張子「.5h」の学習器モデルファイルを読み込む

 3. これから検証したい入力Xデータをmodel.predictにぶち込んで実行する

 4. 実行結果は画像やcsvファイルで保存されます

②pd.DataFrame形式で行方向にappendで追加する場合には、同時に新規に代入作成する必要がある

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os, glob, pathlib
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Activation, Dropout, Flatten, Dense

from sklearn.model_selection import train_test_split
from sklearn import datasets
import time

import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import seaborn as sns

boston = datasets.load_boston()
x = boston.data
x_df = pd.DataFrame(x, columns=boston.feature_names)
y = boston.target
X_train, X_test, y_train, y_test = train_test_split(x, y, test_size=0.3)
X_train_df = pd.DataFrame(X_train, columns=boston.feature_names)
X_test_df = pd.DataFrame(X_test, columns=boston.feature_names)

def build_model(n_features, myLoss, myOptimizer):
    model = Sequential()
    model.add(Dense(units=1000, input_dim=n_features, activation='relu'))
    model.add(Dense(units=800, activation='relu'))
    model.add(Dropout(rate=0.5))
    model.add(Dense(units=100, activation='relu'))
    model.add(Dense(1))
    
    model.compile(loss=myLoss, optimizer=myOptimizer, metrics=['mean_absolute_error'])
    model.summary()

    return model

def run(load_model, n_features, myLoss, check_name, all_optimizer, i=1):
    name = check_name[2:]
    print(name)
    ### load model
    optimizer = all_optimizer[name]
    model = build_model(n_features, myLoss, optimizer)
    print(load_model)
    model.load_weights(load_model)

    ### evaluate
    buf = []
    out_loss, out_metric = model.evaluate(x, y)
    print(name)
    print('loss:{0:.3f}'.format(out_loss))
    print('metrics:', out_metric)
    buf = [name, str(out_loss), str(out_metric)]
    out_data = ','.join(buf)
    with open("load_00_out_data_" + name + ".csv", 'a') as f:
        if i == 1:
            f.write("name, loss, metrics, time" + '\n')
        f.write(out_data + '\n')

    ### predict
    yp1 = model.predict(X_train) #.flatten()
    yp2 = model.predict(X_test) #.flatten()
    
    ### plot graph
    plot_scatter(name, yp1, yp2, i)
    
    ### save csv
    X_train_df["PRICE_pred"] = yp1
    X_test_df["PRICE_pred"] = yp2
    DF1 = X_train_df
    print("1")
    print(DF1)
    DF2 = DF1.append(X_test_df)
    print("2")
    print(DF2)
    
def plot_scatter(name, yp1, yp2, i=1):
    plt.figure(dpi=120)
    plt.title('load' + ". " + name, fontsize=14)
    plt.xlabel('RM', fontsize = 16)
    plt.ylabel('PRICE', fontsize = 16)
    plt.xlim(3.3, 9.3)
    plt.ylim(0, 53)
    plt.scatter(X_train_df.loc[:,'RM'], yp1, facecolors='none', edgecolors='r', label=name + "_pred1")
    plt.scatter(X_test_df.loc[:,'RM'], yp2, facecolors='none', edgecolors='m', label=name + "_pred2")
    plt.legend(loc='lower right', fontsize=14)
    plt.grid(True)
    #plt.show
    plt.savefig('load_' + check_name + '_scatter.png')
    plt.close()

def get_path_ckpt(check_name):
    checkpoint_path = os.path.join("ckpt", check_name + "_cp-{epoch:03d}.ckpt")
    print(checkpoint_path)
    checkpoint_dir = os.path.dirname(checkpoint_path)
    checkpoints_all_list = pathlib.Path(checkpoint_dir).glob(r"**\*.index")
    checkpoints_list = []
    for p in checkpoints_all_list:
        #print(p)
        if check_name in repr(p):
            abs_p = p.resolve()
            #print(abs_p)
            checkpoints_list.append(abs_p)
    print(checkpoints_list)
    
    return checkpoints_list

def get_path_h5(check_name):
    checkpoint_path = os.path.join(check_name + "_model.h5")
    #print(p)
    #checkpoint_path = p.resolve()
    #print(checkpoint_path)
    
    return checkpoint_path

def main(n_features, myLoss, check_name, all_optimizer):
    '''
    checkpoints_list = get_path_ckpt(check_name)
    i=1
    for load_model in checkpoints_list:
        run(load_model, n_features, myLoss, check_name, all_optimizer, i)
        i += 1
    '''
    load_model = get_path_h5(check_name)
    run(load_model, n_features, myLoss, check_name, all_optimizer)

if __name__ == "__main__":
    ### parameter
    n_features = 13
    myLoss = 'mean_squared_error'
    check_name = "2_Adagrad"
    all_optimizer = {
        #'SGD': tf.keras.optimizers.SGD(lr=0.01, momentum=0.0, decay=0.0, nesterov=False),
        'RMSprop': tf.keras.optimizers.RMSprop(lr=0.001, rho=0.9, epsilon=None, decay=0.0),
        'Adagrad': tf.keras.optimizers.Adagrad(lr=0.01, epsilon=None, decay=0.0),
        'Adadelta': tf.keras.optimizers.Adadelta(lr=1.0, rho=0.95, epsilon=None, decay=0.0),
        'Adam': tf.keras.optimizers.Adam(lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=None, decay=0.0, amsgrad=False),
        'Adamax': tf.keras.optimizers.Adamax(lr=0.002, beta_1=0.9, beta_2=0.999, epsilon=None, decay=0.0),
        'Nadam': tf.keras.optimizers.Nadam(lr=0.002, beta_1=0.9, beta_2=0.999, epsilon=None, schedule_decay=0.004),
    }    
    
    ### call function
    main(n_features, myLoss, check_name, all_optimizer)
    
    print('#####\nfinished')
    

●参考資料

hk29.hatenablog.jp

メモ:
pathlibモジュールのglob()で**とすると、サブフォルダも再帰的に走査する。
**はフォルダにのみマッチする。
**/*はファイルにもマッチする。

以上

<広告>