本記事は、Pytorchのインストール方法とコードの雛形について載せました。
▼インストール手順
はじめに、下記リンクの本家webページへ飛びます。
次に、リンク先で下へスクロールすると、下図のようなのが出現するため、自分の使用しているPC環境を選択する。すると、下図青選択箇所に、インストールするために適切なコマンドが出現する。これをコピペでインストールすればOK。
▼本記事のコード実行例
下図は学習過程を視覚化したグラフです。横軸は学習回数で、縦軸は学習指標(のひとつ)MSE(平均2乗誤差)です。右肩下がりで学習している様子がわかります。
下図は、学習後の適合度を可視化したグラフです。青点が生データで、赤点が学習後の予測結果です。見た目にも、そこそこ適合している様子を確認できます。
下図は横軸が生データで、縦軸が学習後の予測結果を可視化したグラフです。比例の傾きが1に近い程、予測の傾向を再現していることを示します。
■本プログラム
本コードを実行するための分析用の生データは、例えば右記リンク先で作成したcsvファイルでお試し出来ます。Python scikit-learn付属のボストン市の住宅価格データ(Boston house prices dataset)をcsvファイル化する - HK29’s blog
#!/usr/bin/env python3 # -*- coding: utf-8 -*- # https://pytorch.org/ # https://pytorch.org/docs/stable/tensors.html import os, sys, glob import pandas as pd import numpy as np import scipy as sp from scipy import optimize import math from sklearn.preprocessing import StandardScaler from sklearn.preprocessing import MinMaxScaler from sklearn.model_selection import train_test_split import torch from torch import nn, optim from torch.utils.data import DataLoader import torch.nn.functional as F import matplotlib matplotlib.use('Agg') import matplotlib.pyplot as plt import seaborn as sns from joblib import dump, load import datetime now = datetime.datetime.now() now = now.strftime("%y%m%d") ######################################## csvファイルからデータをロードし前処理する def my_load_csv(target_Yname): #check_flag, check_name, check_duplicates_columns = check_list df = pd.read_csv(file_path, skiprows=start_row - 1, encoding="utf-8") #"shift-jis") print("df -> " + str(df)) print(df.dtypes) # pandasの列毎の型確認。その列に文字列があるかで決まる。但し、object型は要素毎に異なる可能性がある。 df.replace('none', '0', inplace=True) # 置換。 inplace=Trueでdfが置換される(df=と新規に代入しなくてよい) df.replace('FALSE', '0', inplace=True) df_buf1 = df.iloc[:, :out_start_column] df_buf2 = df.iloc[:, out_start_column:] # ilocとreplaceは同時に使用できなかったため、一旦分割して処理する df_buf2.replace('0', np.nan, inplace=True) #文字列0をnumpy形式のNaNにする df_buf2.replace(0, np.nan, inplace=True) #数値0をnumpy形式のNaNにする #columns_list = df_buf2.columns #df_buf2[columns_list] = pd.to_numeric(df_buf2[columns_list], errors='coerce') #文字列をNaNへ置換する df = pd.concat([df_buf1, df_buf2], axis=1) #データフレームの列を連結して、ひとつのdfへ戻す df.to_csv("data_nan.csv", sep=",", header=True, index=False) print('check NaN -> ' + str(df.isnull().any())) # ひとつでもNaNがある列はTrue、そうでなければFalseと表示される。 print(df.dtypes) if check_duplicates_columns: #指定した列名があった場合、重複行を削除する df2 = df.drop_duplicates(check_duplicates_columns, keep='first') #最初の行が重複とみなされずに残す else: df2 = df print("df2 -> " + str(df2)) if check_flag: df3 = df2.dropna(subset=[target_Yname]) # NaNやNoneなどの欠損値を除外する場合 else: df3 = df2[df2[target_Yname].str.contains(check_name, na=False)] # 指定文字列の行を残す場合 print("df3 -> " + str(df3)) if target_value2ln: # 目的変数Yを対数に変換する場合 df4 = df3.assign(tmp = np.log(df3[target_Yname])) target_Yname = target_Yname + '_ln' tmp_columns = df4.columns.values print("tmp_columns -> " + str(tmp_columns)) tmp_columns[-1] = target_Yname df4.columns = tmp_columns print("df4 -> " + str(df4)) df4.to_csv("data_df4.csv", sep=",", header=True, index=False) DF = df4 else: DF = df3 x_DF = DF.iloc[:, input_columns[0]:input_columns[1]] print("x_DF -> " + str(x_DF)) y = DF.loc[:,target_Yname] x_DF_columns = x_DF.columns #print("x_DF_columns -> " + str(x_DF_columns)) DF.to_csv("data_DF.csv", sep=",", header=True, index=False) #print('check2 NaN -> ' + str(DF.isnull().any())) return x_DF, y, x_DF_columns ######################################## 説明変数Xを標準化する場合 def my_StandardScaler(X_df): scaler = StandardScaler() x_sc = scaler.fit_transform(X_df) # sklearnの上記のようなfit関連の実行後、データの書式がnumpy形式となりカラム(列)名が無くなる。 # そのため、再度、pandas形式に変換する(次の2行)。これはグラフの軸名で使用するため。 x_sc_df = pd.DataFrame(x_sc) # pndas形式のデータフレーム型へ変換 x_DF_columns = X_df.columns # カラム名(列名)を抽出 x_sc_df.columns = x_DF_columns # カラム名(列名)を挿入 return x_sc_df ######################################## 説明変数Xを正規化する場合 def my_MinMaxScaler(X_df): mscaler = MinMaxScaler() x_msc = mscaler.fit_transform(X_df) x_msc_df = pd.DataFrame(x_msc) x_DF_columns = X_df.columns x_msc_df.columns = x_DF_columns return x_msc_df ######################################## グラフ化 # 行列散布図 def my_scatter_matrix(X_df, target_Yname, important_factor, *, y=None, title='NA'): # *を挟むと、以降の変数をキーワード引数で強制する fig = plt.figure(figsize=(12,12)) ax = fig.add_subplot(1,1,1) x = X_df.loc[:,important_factor] # 抽出する列名を指定 if y is None: xy_df = x else: xy_df = pd.concat([x,y], axis=1) # 横方向に連結。axis=1 sns.set(style="ticks", font_scale=1.2) #, palette='gray', color_codes=True) g = sns.pairplot(xy_df, kind="reg", markers=".", diag_kind="kde", diag_kws=dict(shade=True), ) g.fig.suptitle(title + ' ' + target_Yname) #, fontsize=12) g.fig.subplots_adjust(top=0.9) plt.savefig(now + '_scatter_matrix_' + title + '_' + target_Yname + ".png") plt.close() ######################################## グラフ化 # 相関性の確認1 オリジナル生データを青点で、回帰式によるデータを赤点でプロット def plot_scatter(Xname_for_graph, X, target_Yname, Y, pred_Y, title="NA"): save_fname = target_Yname + "_LinearRegression_" + title #fig = plt.figure(num=2, figsize=(8, 6), dpi=100) fig = plt.figure(dpi=120) ax = fig.add_subplot(1,1,1) ax.set_title('Scatter ' + title, fontsize=14) ax.set_xlabel(Xname_for_graph, fontsize=16) ax.set_ylabel(target_Yname, fontsize=16) ax.scatter(X[Xname_for_graph], Y, facecolors='none', edgecolors='b', label='Raw data') ax.scatter(X[Xname_for_graph], pred_Y, facecolors='none', edgecolors='r', label='Multiple regression analysis') ax.legend(loc='lower right', fontsize=14) ax.tick_params(labelsize=16) plt.grid(True) plt.tight_layout() #plt.show() plt.savefig(save_fname + '_scat.png') plt.close() ### 近似式 y = ax 用の関数 def approximation_expression(x, a): return a * x # 相関性の確認2 横軸Y:縦軸Pred_Y def plot_reg(X, target_Yname, Y, pred_Y, title="original"): save_fname = target_Yname + "_LinearRegression_" + title # 欠損値を省いたことにより、連番でなくなっている可能性のあるインデックスをリセット Y = Y.reset_index(drop=True) # drop=Trueにより、元を削除して、新規インデックスを0から連番で作成 DF = pd.concat([Y.rename(target_Yname), pred_Y.rename(target_Yname + "_pred")], axis=1) DF.to_csv('PredY_vs_Y_' + title + ".csv", sep=',', index=False, header=True, encoding='utf-8') popt, pcov = optimize.curve_fit(approximation_expression, Y, pred_Y) ax = plt.figure(num=0, dpi=120).gca() #plt.rcParams["axes.labelsize"] = 15 ax.set_title("pred vs real " + title, fontsize=14) ax.set_xlabel(target_Yname, fontsize=16) ax.set_ylabel("Pred\n" + target_Yname, fontsize=16) rp = ax.scatter(x = target_Yname, y = target_Yname + "_pred", data = DF, facecolors="none", edgecolors='black') # purple # 生データの最小値と最大値 y_min = DF[target_Yname].min() y_max = DF[target_Yname].max() # 予測データの最小値と最大値 y_pred_min = DF[target_Yname + '_pred'].min() y_pred_max = DF[target_Yname + '_pred'].max() # プロットするためのxデータの範囲を決める x_min = min(y_min, y_pred_min) x_max = max(y_max, y_pred_max) print(x_min) print(x_max) # グラフのレンジを決める x_range = x_max - x_min # データの範囲が100以下か以上かで分ける print("x_range = x_max - x_min = " + str(x_range)) if x_max > 1: min_lim = 0 if x_range <= 10: max_lim = math.floor(x_max + 1) else: max_lim = math.floor(x_max + 10) rp.axes.set_xlim(min_lim, max_lim) rp.axes.set_ylim(min_lim, max_lim) else: max_lim = 0 if x_range <= 100: max_lim = math.floor(x_max - 1) else: max_lim = math.floor(x_max - 10) rp.axes.set_xlim(min_lim, max_lim) rp.axes.set_ylim(min_lim, max_lim) # 近似式プロットのためのデータを作成 x_approximation = np.linspace(min_lim, max_lim, 10) # numpyでxデータを作成 y_approximation = popt[0] * x_approximation # 近似式に代入してyデータを作成 line_approximation = ax.plot(x_approximation, y_approximation, linestyle = 'dashed', linewidth = 3, color='r') rp.axes.set_aspect('equal', adjustable='box') plt.grid(True) # 凡例を指定して記入する場合は、リストで指定する ax.legend([line_approximation[0]], ["y = {:.2f}x".format(popt[0])], loc='upper left', numpoints=1, fontsize=15) plt.tick_params(labelsize=15) plt.tight_layout() plt.savefig(save_fname + '_real_vs_pred.png') plt.close() ######################################## Pytourch class Regressor(nn.Module): # ニューラルネットワークのクラス def __init__(self): super().__init__() self.fc1 = nn.Linear(13, 150) # (入力変数の数, ニューロン数) self.fc2 = nn.Linear(150, 80) # (ニューロン数, ニューロン) self.fc3 = nn.Linear(80, 20) # (ニューロン数, ニューロン) self.fc4 = nn.Linear(20, 1) # (ニューロン数, 目的関数の数) self.dropout = nn.Dropout(p=0.5) # ドロップアウトを設定 def forward(self, x): #x = x.view(-1, 13) x = F.relu(self.fc1(x)) x = F.relu(self.fc2(x)) x = self.dropout(F.relu(self.fc3(x))) # 隠れ層3番目にドロップアウト設定 x = F.relu(self.fc4(x)) return x def my_torch(X_df, Y_df, my_feature_names, target_Yname, title): # データを学習用とテスト用に分割 X_train, X_test, y_train, y_test = train_test_split(X_df, Y_df, test_size=0.3) # データをpytorch用のテンソルフロー型へ変換 train_batch = torch.from_numpy(X_train.values).float() label_batch = torch.from_numpy(y_train.values).float().view(-1, 1) X_test = torch.from_numpy(X_test.values).float() y_test = torch.from_numpy(y_test.values).float().view(-1, 1) # 回帰モデルのインスタンス生成 model = Regressor() criterion = nn.MSELoss() # MSE(Mean Squared Error),平均二乗誤差 optimizer = optim.Adam(model.parameters(), lr=learning_rate) # オプティマイザの設定 train_losses, test_losses = [], [] # 学習する for epoch in range(epochs): model.train() train_loss = 0 for i in range(len(train_batch)): optimizer.zero_grad() output = model(train_batch[i]) loss = torch.sqrt(criterion(torch.log(output), torch.log(label_batch[i]))) loss.backward() optimizer.step() train_loss += loss.item() else: test_loss = 0 accuracy = 0 with torch.no_grad(): model.eval() predictions = model(X_test) test_loss += torch.sqrt(criterion(torch.log(predictions), torch.log(y_test))) train_losses.append(train_loss/len(train_batch)) test_losses.append(test_loss) if epoch % 10 == 0: print("Epoch: {}/{} ".format(epoch, epochs), "Training Loss: {:.3f}.. ".format(train_loss/len(train_batch)), "Test Loss: {:.3f}.. ".format(test_loss)) #for param in model.parameters(): # print(param) #print(optimizer.param_groups[0]) # 学習結果を保存 with open(now + '_' + 'NN_from_pytorch.csv', 'w') as f: for i, param in enumerate(optimizer.param_groups[0]['params']): print(i, param) buf_list = [] buf_list.append(str(i)) buf_list.append(str(param)) f.writelines(buf_list) f.write('\n') for key, value in optimizer.param_groups[0].items(): if key != 'params': f.write(key + ',' + str(value) + '\n') # 学習過程をグラフへプロット plt.plot(train_losses, label='Training loss') plt.plot(test_losses, label='Validation loss') plt.xlabel('epochs', fontsize = 16) plt.ylabel('MSE(Mean Squared Error)', fontsize = 16) plt.legend(frameon=False, fontsize=15) plt.tick_params(labelsize=16) plt.grid() plt.tight_layout() plt.savefig('loss_vs_epochs.png') plt.close() test = torch.from_numpy(X_df.values).float() # 学習結果を全データを用いて評価。回帰モデルをオブジェクトファイルで保存 with torch.no_grad(): model.eval() pred_Y_torchtensor = model.forward(test) # 予測結果 dump(model, now + '_' + 'NN_from_pytorch.joblib') # Pytorchのテンソルフロー型から勾配情報を省いたデータをnumpyアレイ型で取得する pred_Y_np = pred_Y_torchtensor.detach().clone().numpy().T[0] pred_Y_s = pd.Series(pred_Y_np) return pred_Y_s ######################################## 指定ファイルを複数一括削除 def my_initialize(): for f in glob.glob('*.png'): os.remove(f) for f in glob.glob('data*'): os.remove(f) for f in glob.glob('20*'): os.remove(f) for f in glob.glob('Tree*'): os.remove(f) ######################################## メイン関数 def main(): # 初期化(前回実行時に生成されたファイルを削除) my_initialize() # 目的変数が複数ある場合、順番に実行する for target_Yname in target_Yname_list: # データセットからデータ抽出 X_df, Y_df, my_feature_names = my_load_csv(target_Yname) # 説明変数Xの前処理 if scalerjudge == 1: # 標準化する場合 X_df = my_StandardScaler(X_df) elif scalerjudge == 2: # 正規化する場合 X_df = my_MinMaxScaler(X_df) else: pass X_df.describe().to_csv('dataX_describe.csv', index=True, encoding='utf-8', mode='a', header=True) # 回帰分析の実行 pred_Y = my_torch(X_df, Y_df, my_feature_names, target_Yname, title) # グラフ作成 plot_scatter(Xname_for_graph, X_df, target_Yname, Y_df, pred_Y, title) plot_reg(X_df, target_Yname, Y_df, pred_Y, title) ######################################## パラメータ if __name__ == '__main__': ##### parameter file_path = "yyy_boston_XYdata.csv" # 読み込む生データcsvファイル title = file_path[:-4] input_factors = 13 # 設計因子の数(読み込んだcsvの左端列からの数までを入力因子とする) Xname_for_graph = 'RM' target_Yname_list = ['PRICE'] # 目的変数Y リストで複数指定できる ### 生データcsvファイルの読み込み設定 start_row = 1 # 読み込み開始行。1行目なら1, 3行目なら3, … target_value2ln = False # Yを対数変換するか? check_flag = True # True:NaNやNoneなどの欠損値を除外する場合。False:指定文字列を残す場合 check_name = "ok" # 上記check_flagが「False」の場合に機能する。指定文字列を残す check_duplicates_columns=[] # ダブりを省く列名を指定。複数可。省かない場合は[]と空リストにする check_list = [check_flag, check_name, check_duplicates_columns] input_columns = (0, input_factors) # 設計因子の列番号(開始, 終了) out_start_column = 100 # この列以降のデータについて、欠損値の処理を行う scalerjudge = 0 # Xデータの前処理:生データそのままの場合は0、 標準化する場合は1、正規化する場合は2 epochs = 100 learning_rate = 0.001 ##### call function main() print("finished")
ニューラルネットワーク(Neural Network, NN)のフレームワークはscikit-learnや TensorFlowなど色々あり、使い勝手が異なります。Pytorchの操作性と可読性は悪くはないです。しかし、今後はTensorFlow2.0以降で本格的に採用されたKerasによるシーケンシャルな手法が主流になってゆくと個人的には思います。
以上
<広告>
リンク