Python 「Pytorch」によるニューラルネットワーク回帰分析

 本記事は、Pytorchのインストール方法とコードの雛形について載せました。

▼インストール手順
はじめに、下記リンクの本家webページへ飛びます。

pytorch.org

次に、リンク先で下へスクロールすると、下図のようなのが出現するため、自分の使用しているPC環境を選択する。すると、下図青選択箇所に、インストールするために適切なコマンドが出現する。これをコピペでインストールすればOK。

f:id:HK29:20200224183138p:plain

▼本記事のコード実行例

下図は学習過程を視覚化したグラフです。横軸は学習回数で、縦軸は学習指標(のひとつ)MSE(平均2乗誤差)です。右肩下がりで学習している様子がわかります。

f:id:HK29:20200224183702p:plain

下図は、学習後の適合度を可視化したグラフです。青点が生データで、赤点が学習後の予測結果です。見た目にも、そこそこ適合している様子を確認できます。

f:id:HK29:20200224183727p:plain

下図は横軸が生データで、縦軸が学習後の予測結果を可視化したグラフです。比例の傾きが1に近い程、予測の傾向を再現していることを示します。

f:id:HK29:20200224183753p:plain

■本プログラム

本コードを実行するための分析用の生データは、例えば右記リンク先で作成したcsvファイルでお試し出来ます。Python scikit-learn付属のボストン市の住宅価格データ(Boston house prices dataset)をcsvファイル化する - HK29’s blog

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# https://pytorch.org/
# https://pytorch.org/docs/stable/tensors.html
import os, sys, glob
import pandas as pd
import numpy as np
import scipy as sp
from scipy import optimize
import math

from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split

import torch
from torch import nn, optim
from torch.utils.data import DataLoader
import torch.nn.functional as F

import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import seaborn as sns

from joblib import dump, load
import datetime
now = datetime.datetime.now()
now = now.strftime("%y%m%d")

######################################## csvファイルからデータをロードし前処理する
def my_load_csv(target_Yname):
    #check_flag, check_name, check_duplicates_columns = check_list
    df = pd.read_csv(file_path, skiprows=start_row - 1, encoding="utf-8") #"shift-jis")
    print("df -> " + str(df))
    print(df.dtypes) # pandasの列毎の型確認。その列に文字列があるかで決まる。但し、object型は要素毎に異なる可能性がある。
    df.replace('none', '0', inplace=True) # 置換。 inplace=Trueでdfが置換される(df=と新規に代入しなくてよい)
    df.replace('FALSE', '0', inplace=True)
    df_buf1 = df.iloc[:, :out_start_column]
    df_buf2 = df.iloc[:, out_start_column:] # ilocとreplaceは同時に使用できなかったため、一旦分割して処理する

    df_buf2.replace('0', np.nan, inplace=True) #文字列0をnumpy形式のNaNにする
    df_buf2.replace(0, np.nan, inplace=True) #数値0をnumpy形式のNaNにする

    #columns_list = df_buf2.columns
    #df_buf2[columns_list] = pd.to_numeric(df_buf2[columns_list], errors='coerce') #文字列をNaNへ置換する

    df = pd.concat([df_buf1, df_buf2], axis=1) #データフレームの列を連結して、ひとつのdfへ戻す
    df.to_csv("data_nan.csv", sep=",", header=True, index=False)
    print('check NaN -> ' + str(df.isnull().any())) # ひとつでもNaNがある列はTrue、そうでなければFalseと表示される。
    print(df.dtypes)

    if check_duplicates_columns: #指定した列名があった場合、重複行を削除する
        df2 = df.drop_duplicates(check_duplicates_columns, keep='first') #最初の行が重複とみなされずに残す
    else:
        df2 = df
    print("df2 -> " + str(df2))

    if check_flag:
        df3 = df2.dropna(subset=[target_Yname]) # NaNやNoneなどの欠損値を除外する場合
    else:
        df3 = df2[df2[target_Yname].str.contains(check_name, na=False)] # 指定文字列の行を残す場合
    print("df3 -> " + str(df3))

    if target_value2ln: # 目的変数Yを対数に変換する場合
        df4 = df3.assign(tmp = np.log(df3[target_Yname]))
        target_Yname = target_Yname + '_ln'
        tmp_columns = df4.columns.values
        print("tmp_columns -> " + str(tmp_columns))
        tmp_columns[-1] = target_Yname
        df4.columns = tmp_columns
        print("df4 -> " + str(df4))
        df4.to_csv("data_df4.csv", sep=",", header=True, index=False)
        DF = df4
    else:
        DF = df3

    x_DF = DF.iloc[:, input_columns[0]:input_columns[1]]
    print("x_DF -> " + str(x_DF))
    y = DF.loc[:,target_Yname]
    x_DF_columns = x_DF.columns
    #print("x_DF_columns -> " + str(x_DF_columns))
    DF.to_csv("data_DF.csv", sep=",", header=True, index=False)
    #print('check2 NaN -> ' + str(DF.isnull().any()))

    return x_DF, y, x_DF_columns

######################################## 説明変数Xを標準化する場合
def my_StandardScaler(X_df):
    scaler = StandardScaler()
    x_sc = scaler.fit_transform(X_df)  
    # sklearnの上記のようなfit関連の実行後、データの書式がnumpy形式となりカラム(列)名が無くなる。
    # そのため、再度、pandas形式に変換する(次の2行)。これはグラフの軸名で使用するため。
    x_sc_df = pd.DataFrame(x_sc) # pndas形式のデータフレーム型へ変換
    x_DF_columns = X_df.columns # カラム名(列名)を抽出
    x_sc_df.columns = x_DF_columns # カラム名(列名)を挿入
    
    return x_sc_df

######################################## 説明変数Xを正規化する場合
def my_MinMaxScaler(X_df):
    mscaler = MinMaxScaler()
    x_msc = mscaler.fit_transform(X_df)
    x_msc_df = pd.DataFrame(x_msc)
    x_DF_columns = X_df.columns
    x_msc_df.columns = x_DF_columns
    
    return x_msc_df

######################################## グラフ化
# 行列散布図
def my_scatter_matrix(X_df, target_Yname, important_factor, *, y=None, title='NA'): # *を挟むと、以降の変数をキーワード引数で強制する
    fig = plt.figure(figsize=(12,12))
    ax = fig.add_subplot(1,1,1)
    x = X_df.loc[:,important_factor] # 抽出する列名を指定
    if y is None:
        xy_df = x
    else:
        xy_df = pd.concat([x,y], axis=1) # 横方向に連結。axis=1
    sns.set(style="ticks", font_scale=1.2) #, palette='gray', color_codes=True)
    g = sns.pairplot(xy_df,
                     kind="reg",
                     markers=".",
                     diag_kind="kde",
                     diag_kws=dict(shade=True),
                     )
    g.fig.suptitle(title + ' ' + target_Yname) #, fontsize=12)
    g.fig.subplots_adjust(top=0.9)
    plt.savefig(now + '_scatter_matrix_' + title + '_' + target_Yname + ".png")
    plt.close()

######################################## グラフ化
# 相関性の確認1 オリジナル生データを青点で、回帰式によるデータを赤点でプロット
def plot_scatter(Xname_for_graph, X, target_Yname, Y, pred_Y, title="NA"):
    save_fname = target_Yname + "_LinearRegression_" + title

    #fig = plt.figure(num=2, figsize=(8, 6), dpi=100)
    fig = plt.figure(dpi=120)
    ax = fig.add_subplot(1,1,1)
    ax.set_title('Scatter ' + title, fontsize=14)
    ax.set_xlabel(Xname_for_graph, fontsize=16)
    ax.set_ylabel(target_Yname, fontsize=16)
    ax.scatter(X[Xname_for_graph], Y, facecolors='none', edgecolors='b', label='Raw data')
    ax.scatter(X[Xname_for_graph], pred_Y, facecolors='none', edgecolors='r', label='Multiple regression analysis')
    ax.legend(loc='lower right', fontsize=14)
    ax.tick_params(labelsize=16)
    plt.grid(True)
    plt.tight_layout()
    #plt.show()
    plt.savefig(save_fname + '_scat.png')
    plt.close()

### 近似式 y = ax 用の関数 
def approximation_expression(x, a):
    return a * x

# 相関性の確認2 横軸Y:縦軸Pred_Y
def plot_reg(X, target_Yname, Y, pred_Y, title="original"):
    save_fname = target_Yname + "_LinearRegression_" + title

    # 欠損値を省いたことにより、連番でなくなっている可能性のあるインデックスをリセット
    Y = Y.reset_index(drop=True) # drop=Trueにより、元を削除して、新規インデックスを0から連番で作成
    DF = pd.concat([Y.rename(target_Yname), pred_Y.rename(target_Yname + "_pred")], axis=1)
    DF.to_csv('PredY_vs_Y_' + title + ".csv", sep=',', index=False, header=True, encoding='utf-8')
    
    popt, pcov = optimize.curve_fit(approximation_expression, Y, pred_Y)
    
    ax = plt.figure(num=0, dpi=120).gca() 
    #plt.rcParams["axes.labelsize"] = 15
    ax.set_title("pred vs real " + title, fontsize=14)
    ax.set_xlabel(target_Yname, fontsize=16)
    ax.set_ylabel("Pred\n" + target_Yname, fontsize=16)
    rp = ax.scatter(x = target_Yname,
                    y = target_Yname + "_pred",
                    data = DF,
                    facecolors="none",
                    edgecolors='black') # purple

    # 生データの最小値と最大値
    y_min = DF[target_Yname].min()
    y_max = DF[target_Yname].max()
    # 予測データの最小値と最大値
    y_pred_min = DF[target_Yname + '_pred'].min()
    y_pred_max = DF[target_Yname + '_pred'].max()
    # プロットするためのxデータの範囲を決める
    x_min = min(y_min, y_pred_min)
    x_max = max(y_max, y_pred_max)
    print(x_min)
    print(x_max)

    # グラフのレンジを決める
    x_range = x_max - x_min # データの範囲が100以下か以上かで分ける
    print("x_range = x_max - x_min = " + str(x_range))
    if x_max > 1:
        min_lim = 0
        if x_range <= 10:
            max_lim = math.floor(x_max + 1)
        else:
            max_lim = math.floor(x_max + 10)
        rp.axes.set_xlim(min_lim, max_lim)
        rp.axes.set_ylim(min_lim, max_lim)
    else:
        max_lim = 0
        if x_range <= 100:
            max_lim = math.floor(x_max - 1)
        else:
            max_lim = math.floor(x_max - 10)
        rp.axes.set_xlim(min_lim, max_lim)
        rp.axes.set_ylim(min_lim, max_lim)

    # 近似式プロットのためのデータを作成
    x_approximation = np.linspace(min_lim, max_lim, 10) # numpyでxデータを作成
    y_approximation = popt[0] * x_approximation # 近似式に代入してyデータを作成
    line_approximation = ax.plot(x_approximation, y_approximation,
                                 linestyle = 'dashed', linewidth = 3, color='r') 
    
    rp.axes.set_aspect('equal', adjustable='box')
    plt.grid(True)
    # 凡例を指定して記入する場合は、リストで指定する
    ax.legend([line_approximation[0]], ["y = {:.2f}x".format(popt[0])],
               loc='upper left',
               numpoints=1,
               fontsize=15)
    plt.tick_params(labelsize=15)
    plt.tight_layout() 
    plt.savefig(save_fname + '_real_vs_pred.png') 
    plt.close()

######################################## Pytourch
class Regressor(nn.Module): # ニューラルネットワークのクラス
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(13, 150) # (入力変数の数, ニューロン数)
        self.fc2 = nn.Linear(150, 80) # (ニューロン数, ニューロン)
        self.fc3 = nn.Linear(80, 20) # (ニューロン数, ニューロン)
        self.fc4 = nn.Linear(20, 1) # (ニューロン数, 目的関数の数)
        self.dropout = nn.Dropout(p=0.5) # ドロップアウトを設定

    def forward(self, x):
        #x = x.view(-1, 13)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.dropout(F.relu(self.fc3(x))) # 隠れ層3番目にドロップアウト設定
        x = F.relu(self.fc4(x))

        return x

def my_torch(X_df, Y_df, my_feature_names, target_Yname, title):
    # データを学習用とテスト用に分割
    X_train, X_test, y_train, y_test = train_test_split(X_df, Y_df, test_size=0.3)

    # データをpytorch用のテンソルフロー型へ変換
    train_batch = torch.from_numpy(X_train.values).float()
    label_batch = torch.from_numpy(y_train.values).float().view(-1, 1)
    X_test = torch.from_numpy(X_test.values).float()
    y_test = torch.from_numpy(y_test.values).float().view(-1, 1)

    # 回帰モデルのインスタンス生成
    model = Regressor()
    criterion = nn.MSELoss() # MSE(Mean Squared Error),平均二乗誤差
    optimizer = optim.Adam(model.parameters(), lr=learning_rate) # オプティマイザの設定

    train_losses, test_losses = [], []
    # 学習する
    for epoch in range(epochs):
        model.train()
        train_loss = 0
        for i in range(len(train_batch)):
            optimizer.zero_grad()
            output = model(train_batch[i])
            loss = torch.sqrt(criterion(torch.log(output), torch.log(label_batch[i])))
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
            
        else:
            test_loss = 0
            accuracy = 0
            
            with torch.no_grad():
                model.eval()
                predictions = model(X_test)
                test_loss += torch.sqrt(criterion(torch.log(predictions), torch.log(y_test)))
                    
            train_losses.append(train_loss/len(train_batch))
            test_losses.append(test_loss)
            if epoch % 10 == 0:
                print("Epoch: {}/{} ".format(epoch, epochs),
                      "Training Loss: {:.3f}.. ".format(train_loss/len(train_batch)),
                      "Test Loss: {:.3f}.. ".format(test_loss))
                #for param in model.parameters():
                #    print(param)
    #print(optimizer.param_groups[0])
    # 学習結果を保存
    with open(now + '_' + 'NN_from_pytorch.csv', 'w') as f:
        for i, param in enumerate(optimizer.param_groups[0]['params']):
            print(i, param)
            buf_list = []
            buf_list.append(str(i))
            buf_list.append(str(param))
            f.writelines(buf_list)
            f.write('\n')
        for key, value in optimizer.param_groups[0].items():
            if key != 'params':
                f.write(key + ',' + str(value) + '\n')

    # 学習過程をグラフへプロット
    plt.plot(train_losses, label='Training loss')
    plt.plot(test_losses, label='Validation loss')
    plt.xlabel('epochs', fontsize = 16)
    plt.ylabel('MSE(Mean Squared Error)', fontsize = 16)
    plt.legend(frameon=False, fontsize=15)
    plt.tick_params(labelsize=16)
    plt.grid()
    plt.tight_layout()
    plt.savefig('loss_vs_epochs.png')
    plt.close()

    test = torch.from_numpy(X_df.values).float()

    # 学習結果を全データを用いて評価。回帰モデルをオブジェクトファイルで保存
    with torch.no_grad():
        model.eval()
        pred_Y_torchtensor = model.forward(test) # 予測結果
        dump(model, now + '_' + 'NN_from_pytorch.joblib') 

        # Pytorchのテンソルフロー型から勾配情報を省いたデータをnumpyアレイ型で取得する
        pred_Y_np = pred_Y_torchtensor.detach().clone().numpy().T[0]
        pred_Y_s = pd.Series(pred_Y_np)
    
    return pred_Y_s

######################################## 指定ファイルを複数一括削除
def my_initialize():
    for f in glob.glob('*.png'):
        os.remove(f)
    for f in glob.glob('data*'):
        os.remove(f)
    for f in glob.glob('20*'):
        os.remove(f)
    for f in glob.glob('Tree*'):
        os.remove(f)

######################################## メイン関数
def main():
    # 初期化(前回実行時に生成されたファイルを削除)
    my_initialize()
    # 目的変数が複数ある場合、順番に実行する
    for target_Yname in target_Yname_list:
        # データセットからデータ抽出
        X_df, Y_df, my_feature_names = my_load_csv(target_Yname)

        # 説明変数Xの前処理
        if scalerjudge == 1: # 標準化する場合
            X_df = my_StandardScaler(X_df)
        elif scalerjudge == 2: # 正規化する場合
            X_df = my_MinMaxScaler(X_df)
        else:
            pass
        X_df.describe().to_csv('dataX_describe.csv', index=True, encoding='utf-8', mode='a', header=True)

        # 回帰分析の実行
        pred_Y = my_torch(X_df, Y_df, my_feature_names, target_Yname, title)

        # グラフ作成
        plot_scatter(Xname_for_graph, X_df, target_Yname, Y_df, pred_Y, title)
        plot_reg(X_df, target_Yname, Y_df, pred_Y, title)

######################################## パラメータ
if __name__ == '__main__':
    ##### parameter
    file_path = "yyy_boston_XYdata.csv" # 読み込む生データcsvファイル
    title = file_path[:-4]
    input_factors = 13 # 設計因子の数(読み込んだcsvの左端列からの数までを入力因子とする)
    Xname_for_graph = 'RM'
    target_Yname_list = ['PRICE'] # 目的変数Y リストで複数指定できる

    ### 生データcsvファイルの読み込み設定
    start_row = 1 # 読み込み開始行。1行目なら1, 3行目なら3, …
    target_value2ln = False # Yを対数変換するか?
    check_flag = True # True:NaNやNoneなどの欠損値を除外する場合。False:指定文字列を残す場合
    check_name = "ok" # 上記check_flagが「False」の場合に機能する。指定文字列を残す
    check_duplicates_columns=[] # ダブりを省く列名を指定。複数可。省かない場合は[]と空リストにする
    check_list = [check_flag, check_name, check_duplicates_columns]

    input_columns = (0, input_factors) # 設計因子の列番号(開始, 終了)
    out_start_column = 100 # この列以降のデータについて、欠損値の処理を行う
    scalerjudge = 0 # Xデータの前処理:生データそのままの場合は0、 標準化する場合は1、正規化する場合は2

    epochs = 100
    learning_rate = 0.001 

    ##### call function
    main()
    print("finished")

ニューラルネットワーク(Neural Network, NN)のフレームワークはscikit-learnや TensorFlowなど色々あり、使い勝手が異なります。Pytorchの操作性と可読性は悪くはないです。しかし、今後はTensorFlow2.0以降で本格的に採用されたKerasによるシーケンシャルな手法が主流になってゆくと個人的には思います。

以上

<広告>