Python scikit-learnによる交互作用の項を考慮した重回帰分析

'20/02/15更新
回帰分析後の相関性確認用のグラフ上に、y=axの近似式を入れる仕様にした
 本記事では、回帰モデル(回帰式)の項に、相互作用項として「因子A×因子B」や「因子A×因子B×因子C」等が加えられます。

f:id:HK29:20191207164949p:plain

上図のように分析するcsvファイルがあればよい。本例ではscikit-learnに同梱されているbostonデータセットを予めcsvファイルに出力したものを使用した。作成方法は右記参照Python scikit-learn 付属のボストン市の住宅価格データ(Boston house prices dataset)をcsvファイル化する - HK29’s blog

#####
 本記事では、重回帰分析の交互作用項の有無による分析結果の比較もした。目的変数Yは住宅価格、説明変数Xは部屋数や犯罪率など13因子である。まず、「交互作用の項を考慮しない場合」である。下図をみると、RM(部屋数)>RAD(環状高速道路へのアクセスの良さ)>ZN(住宅の占める割合)>…の順で住宅価格が高くなる傾向であり、納得がゆく。

f:id:HK29:20200119122022p:plain

下図は青点が生データ、赤点は重回帰分析で得られた回帰式に生データXを代入した散布図である。図中の上部の青点が横一直線に並んでいるのは、外れ値もしくは交互作用による効果が考えられるがこれだけでは判断できない。

f:id:HK29:20191221173821p:plain

下図は横軸に目的変数Yのデータ、縦軸に回帰式による予測結果である。

f:id:HK29:20200215213735p:plain

 次に、「交互作用の項を考慮した場合」を考える。組み合わせ数はnCk=n!/(k!(n-k)!で計算できる。元が13因子あるために組み合わせ2項の場合はプラス91因子、3項とした場合はプラス286因子であり、合計377因子となる。相互作用を考慮した重回帰分析の結果を以降に示す。部屋数「RM」が第一位になっていない。参考までに、データによっては正規化や標準化をした方が良い場合もある。

 

下図をみると、視覚的にも当てはまりはよくなっており、交互作用の項を加味することで再現性が高くなっている様子がわかる

f:id:HK29:20191221173453p:plain

交互作用の項を加味することで適合度が良くなっている様子がわかる(下図)。

f:id:HK29:20200215213806p:plain

特徴量は爆増するため、正側の上位5つと負側の上位5つを抜粋して、下図のように横棒グラフ化する仕様にした('20/01/19更新)

f:id:HK29:20200119122231p:plain

下図のように、作成した回帰式の関数をモジュール化して「.py」pythonスクリプトファイルで保存します。

f:id:HK29:20191221171446p:plain

f:id:HK29:20191221171534p:plain

▼本プログラム

交互作用項の作成にはitertools.combinationsを使用した。itertoolsの特徴は多重ループの作成を簡便にできることである。

# -*- coding: utf-8 -*-
import os, sys, glob
import pandas as pd
import numpy as np
import scipy as sp
from scipy import optimize
import math
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import seaborn as sns
import itertools
from sklearn import datasets
from sklearn import linear_model
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import MinMaxScaler
from joblib import dump, load
import datetime
import linecache

now = datetime.datetime.now()
now = now.strftime("%y%m%d")

######################################## csvファイルからデータをロードし前処理する
def my_load_csv(path, check_column, check_duplicate_columns,
                start_row, input_columns, out_start_column,
                target_Yname, target_value2ln):
    df = pd.read_csv(path, skiprows=start_row - 1, encoding="shift-jis")
    #print("df -> " + str(df))
    #print(df.dtypes) # pandasの列毎の型確認。その列に文字列があるかで決まる。但し、object型は要素毎に異なる可能性がある。
    df.replace('none', '0', inplace=True) # 置換。 inplace=Trueでdfが置換される(df=と新規に代入しなくてよい)
    df.replace('FALSE', '0', inplace=True)
    df_buf1 = df.iloc[:, :out_start_column]
    df_buf2 = df.iloc[:, out_start_column:] # ilocとreplaceは同時に使用できなかったため、一旦分割して処理する
    
    df_buf2.replace('0', np.nan, inplace=True) #文字列0をnumpy形式のNaNにする
    df_buf2.replace(0, np.nan, inplace=True) #数値0をnumpy形式のNaNにする    
    
    #columns_list = df_buf2.columns
    #df_buf2[columns_list] = pd.to_numeric(df_buf2[columns_list], errors='coerce') #文字列をNaNへ置換する
    
    df = pd.concat([df_buf1, df_buf2], axis=1) #データフレームの列を連結して、ひとつのdfへ戻す
    df.to_csv("data_nan.csv", sep=",", header=True, index=False)
    #print('check NaN -> ' + str(df.isnull().any())) # ひとつでもNaNがある列はTrue、そうでなければFalseと表示される。
    #print(df.dtypes)
    
    if check_duplicate_columns: #指定した列名があった場合、重複行を削除する
        df2 = df.drop_duplicates(check_duplicate_columns, keep='first') #最初の行が重複とみなされずに残す
    else:
        df2 = df
    #print("df2 -> " + str(df2))
    
    if check_column[0]:
        df3 = df2.dropna(subset=[check_column[1]]) # NaNやNoneなどの欠損値を除外する場合
    else:
        df3 = df2[df2[check_column[1]].str.contains(check_column[2], na=False)] # 指定文字列の行を残す場合
    #print("df3 -> " + str(df3))
    
    if target_value2ln: # 目的変数Yを対数に変換する場合
        df4 = df3.assign(tmp = np.log(df3[target_Yname]))
        target_Yname = target_Yname + '_ln'
        tmp_columns = df4.columns.values
        #print("tmp_columns -> " + str(tmp_columns))
        tmp_columns[-1] = target_Yname
        df4.columns = tmp_columns
        #print("df4 -> " + str(df4))
        df4.to_csv("data_df4.csv", sep=",", header=True, index=False)
        DF = df4
    else:
        DF = df3
        
    x_DF = DF.iloc[:, input_columns[0]:input_columns[1]]
    #print("x_DF -> " + str(x_DF))
    y = DF.loc[:,target_Yname] 
    x_DF_columns = x_DF.columns
    #print("x_DF_columns -> " + str(x_DF_columns))
    DF.to_csv("data_DF.csv", sep=",", header=True, index=False)
    #print('check2 NaN -> ' + str(DF.isnull().any()))
    
    return x_DF, y, x_DF_columns

######################################## 交互作用項のデータ作成
def my_InteractionData(my_feature_names, X):
    #print(list(itertools.combinations(my_feature_names, 2))) # 2項の交互作用の組み合わせ。itertoolsを利用
    interact_names=[]
    for i in itertools.combinations(my_feature_names, 2):
        interact_names.append(i[0]+" * "+i[1])
    new_fnames = my_feature_names.tolist() + interact_names # 新規の説明変数のリストを作成。既存名に追加の場合はextend
    new_fnames_np = np.array(new_fnames) # リストをnumpy形式へ変換
    #print(new_fnames_np)
    #print(new_fnames_np.shape)

    x_np = X.values # pandas形式をnumpy形式へ変換。 X.as_matrix()でも可。
    #print(x_np.shape) # shapeは次元の要素数を示す。例:2次元の場合(行数,列数)となる
    for i, j in itertools.combinations(range(x_np.shape[1]), 2):
        interact_columnvalue = np.array([x_np[:,i]*x_np[:,j]]) # 配列の次元を2次元に合わすために2重の[]で括る
        x_np = np.concatenate((x_np, interact_columnvalue.T), axis=1) # 転置行列にして、列で挿入 
    x_df2 = pd.DataFrame(x_np) # pandas形式へ変換 
    x_df2.columns = new_fnames_np # カラム名を追記
    #print(x_df2)

    #print(list(itertools.combinations(my_feature_names, 3))) # 3項の交互作用の組み合わせ。itertoolsを利用
    interact_names=[]
    for i in itertools.combinations(my_feature_names, 3):
        interact_names.append(i[0]+" * "+i[1]+" * "+i[2])
    new_fnames = my_feature_names.tolist() + interact_names # 新規の説明変数のリストを作成。既存名に追加の場合はextend
    new_fnames_np = np.array(new_fnames) # リストをnumpy形式へ変換
    #print(new_fnames_np)
    #print(new_fnames_np.shape)

    x_np = X.values # pandas形式をnumpy形式へ変換。 X.as_matrix()でも可。
    #print(x_np.shape) # shapeは次元の要素数を示す。例:2次元の場合(行数,列数)となる
    for i, j, k in itertools.combinations(range(x_np.shape[1]), 3):
        interact_columnvalue = np.array([x_np[:,i] * x_np[:,j] * x_np[:,k]]) # 配列の次元を2次元に合わすために2重の[]で括る
        x_np = np.concatenate((x_np, interact_columnvalue.T), axis=1) # 転置行列にして、列で挿入 
    x_df3 = pd.DataFrame(x_np) # pandas形式へ変換 
    x_df3.columns = new_fnames_np # カラム名を追記
    #print(x_df3)
    
    x_df4 = pd.concat([x_df2, x_df3], axis=1)
    x_df = x_df4.loc[:, ~x_df4.columns.duplicated()]
    x_df.to_csv('data_new_features.csv', sep=',', index=False, encoding='utf-8', header=True)    
    
    return x_df, new_fnames_np

######################################## 説明変数Xを標準化する場合
def my_StandardScaler(X_df):
    scaler = StandardScaler()
    x_sc = scaler.fit_transform(X_df)  
    # sklearnの上記のようなfit関連の実行後、データの書式がnumpy形式となりカラム(列)名が無くなる。
    # そのため、再度、pandas形式に変換する(次の2行)。これはグラフの軸名で使用するため。
    x_sc_df = pd.DataFrame(x_sc) # pndas形式のデータフレーム型へ変換
    x_DF_columns = X_df.columns # カラム名(列名)を抽出
    x_sc_df.columns = x_DF_columns # カラム名(列名)を挿入
    
    return x_sc_df

######################################## 説明変数Xを正規化する場合
def my_MinMaxScaler(X_df):
    mscaler = MinMaxScaler()
    x_msc = mscaler.fit_transform(X_df)
    x_msc_df = pd.DataFrame(x_msc)
    x_DF_columns = X_df.columns
    x_msc_df.columns = x_DF_columns
    
    return x_msc_df

######################################## 回帰分析
def my_LinearRegression(X, Y, target_Yname, title="NA"):
    save_fname = target_Yname + "_LinearRegression_" + title

    regr = linear_model.LinearRegression(fit_intercept=True, normalize=False, copy_X=True, n_jobs=1)
    regr.fit(X, Y)
    output = pd.DataFrame({"Name":X.columns, "Coefficients":regr.coef_}).sort_values(by='Coefficients')
    print(output)
    print(regr.intercept_) # 切片
    print(regr.score(X,Y)) # R^2
    with open(save_fname + '_interf_R2.txt', 'w') as f:
        print(output, file=f)
        print(regr.intercept_, file=f)
        print(regr.score(X,Y), file=f)     
    extracted_df = pd.DataFrame(output)
    extracted_df.to_csv(save_fname + '_Coeff.csv', sep=',', index=False, encoding='utf-8', header=True)

    plt.figure(dpi=120) 
    if input_factors > 10:
        output1_1 = output.sort_values(by='Coefficients', ascending=False)[:5] # pandas最初の5行を抽出
        output1_2 = output.sort_values(by='Coefficients', ascending=False)[-5:] # pandas末尾の5行を抽出
        output2 = pd.concat([output1_1, output1_2]) 
    else:
        output2 = output.sort_values(by='Coefficients', ascending=False)[:]
    output2 = output2.sort_values(by='Coefficients', ascending=True)
    x=output2.Name
    y=output2.Coefficients
    plt.barh(range(len(x)), y, tick_label=x, align="center", color="magenta", height=0.7) # 軸が文字列の場合、ソートされてしまう対策
    plt.title(title, fontsize=14)
    plt.xlabel('Coefficients', fontsize = 14)
    plt.tick_params(labelsize=14)
    plt.tight_layout()
    #plt.show()
    plt.savefig(save_fname + '_Coeff.png')
    dump(regr, now + '_' + save_fname + '_model.joblib')
    
    pred_Y_np = regr.predict(X)
    pred_Y = pd.Series(pred_Y_np)
    
    return pred_Y, title

######################################## グラフ化
# 相関性の確認1 オリジナル生データを青点で、回帰式によるデータを赤点でプロット
def plot_scatter(Xname_for_graph, X, target_Yname, Y, pred_Y, title="NA"):
    save_fname = target_Yname + "_LinearRegression_" + title

    #fig = plt.figure(num=2, figsize=(8, 6), dpi=100)
    fig = plt.figure(dpi=120)
    ax = fig.add_subplot(1,1,1)
    ax.set_title('Scatter ' + title, fontsize=14)
    ax.set_xlabel(Xname_for_graph, fontsize=16)
    ax.set_ylabel(target_Yname, fontsize=16)
    ax.scatter(X[Xname_for_graph], Y, facecolors='none', edgecolors='b', label='Raw data')
    ax.scatter(X[Xname_for_graph], pred_Y, facecolors='none', edgecolors='r', label='Multiple regression analysis')
    ax.legend(loc='lower right', fontsize=14)
    ax.tick_params(labelsize=16)
    plt.grid(True)
    plt.tight_layout()
    #plt.show()
    plt.savefig(save_fname + '_scat.png')
    plt.close()

### 近似式 y = ax 用の関数 
def approximation_expression(x, a):
    return a * x

# 相関性の確認2 横軸Y:縦軸Pred_Y
def plot_reg(X, target_Yname, Y, pred_Y, title="original"):
    save_fname = target_Yname + "_LinearRegression_" + title

    # 欠損値を省いたことにより、連番でなくなっている可能性のあるインデックスをリセット
    Y = Y.reset_index(drop=True) # drop=Trueにより、元を削除して、新規インデックスを0から連番で作成
    DF = pd.concat([Y.rename(target_Yname), pred_Y.rename(target_Yname + "_pred")], axis=1)
    DF.to_csv('PredY_vs_Y_' + title + ".csv", sep=',', index=False, header=True, encoding='utf-8')
    
    popt, pcov = optimize.curve_fit(approximation_expression, Y, pred_Y)
    
    ax = plt.figure(num=0, dpi=120).gca() 
    #plt.rcParams["axes.labelsize"] = 15
    ax.set_title("pred vs real " + title, fontsize=14)
    ax.set_xlabel(target_Yname, fontsize=16)
    ax.set_ylabel("Pred\n" + target_Yname, fontsize=16)
    rp = ax.scatter(x = target_Yname,
                    y = target_Yname + "_pred",
                    data = DF,
                    facecolors="none",
                    edgecolors='black') # purple

    # 生データの最小値と最大値
    y_min = DF[target_Yname].min()
    y_max = DF[target_Yname].max()
    # 予測データの最小値と最大値
    y_pred_min = DF[target_Yname + '_pred'].min()
    y_pred_max = DF[target_Yname + '_pred'].max()
    # プロットするためのxデータの範囲を決める
    x_min = min(y_min, y_pred_min)
    x_max = max(y_max, y_pred_max)

    # グラフのレンジを決める
    x_range = x_max - x_min # データの範囲が100以下か以上かで分ける
    print("x_range = x_max - x_min = " + str(x_range))
    if x_max > 1:
        min_lim = 0
        if x_range <= 10:
            max_lim = math.floor(x_max + 1)
        else:
            max_lim = math.floor(x_max + 10)
        rp.axes.set_xlim(min_lim, max_lim)
        rp.axes.set_ylim(min_lim, max_lim)
    else:
        max_lim = 0
        if x_range <= 100:
            max_lim = math.floor(x_max - 1)
        else:
            max_lim = math.floor(x_max - 10)
        rp.axes.set_xlim(min_lim, max_lim)
        rp.axes.set_ylim(min_lim, max_lim)

    # 近似式プロットのためのデータを作成
    x_approximation = np.linspace(min_lim, max_lim, 10) # numpyでxデータを作成
    y_approximation = popt[0] * x_approximation # 近似式に代入してyデータを作成
    line_approximation = ax.plot(x_approximation, y_approximation,
                                 linestyle = 'dashed', linewidth = 3, color='r') 
    
    rp.axes.set_aspect('equal', adjustable='box')
    plt.grid(True)
    # 凡例を指定して記入する場合は、リストで指定する
    ax.legend([line_approximation[0]], ["y = {:.2f}x".format(popt[0])],
               loc='upper left',
               numpoints=1,
               fontsize=15)
    plt.tick_params(labelsize=15)
    plt.tight_layout() 
    plt.savefig(save_fname + '_real_vs_pred.png') 
    plt.close()

def create_func_in_module(target_Yname, my_feature_names, title='NA'):
    save_fname = target_Yname + "_LinearRegression_" + title
    input_interf_file = save_fname + "_interf_R2.txt"
    input_coef_file = save_fname + "_Coeff.csv"    
    
    # 切片読み出し
    lastrow = sum(1 for i in open(input_interf_file)) #最終行番号を取得
    print('lastrow ->' + str(lastrow))
    
    R2 = linecache.getline(input_interf_file, lastrow) #指定行を文字列として取得 
    print('R2 ->{:.1%}'.format(float(R2)))
    R2 = R2[:-2] #改行コード\nを除去する場合
    interf = linecache.getline(input_interf_file, lastrow - 1)
    print('interf ->' + str(interf))
    interf = interf[:-2]

    df = pd.read_csv(input_coef_file)#, delimiter=',', header=None, skiprows=1)
    print(df)

    with open(now + '_' + save_fname + '_module.py', 'w') as f:
        tmplist=[]
        i=0
        f.write('#!/usr/bin/env python' + '\n') #文末の\nは改行コード
        f.write('# -*- coding: utf-8 -*-' + '\n\n') #改行2回する場合は2回書く
        f.write('# R2={:.1%}'.format(float(R2)) + '\n\n')
        f.write('def ' + title + '_func(')
        my_features = ','.join(my_feature_names.tolist())
        f.writelines(my_features)
        f.write('):' + '\n')
        f.write('    c = ' + interf + '\n')
        for val, name in zip(df['Coefficients'], df['Name']) : #2列の各行の値を同時取得
            tmp = '    z' + str(i) + ' = ' + str(val) + ' * ' + str(name)
            #print(tmp)
            f.write(tmp + '\n')
            if i == 0:
                tmplist.append('z' + str(i))
            else:
                tmplist.append(' + z' + str(i))
            i+=1
        #print(tmplist)
        for i, name in enumerate(tmplist): #インデックスと要素を同時取得
            if i==0:
                f.write('    val_myfunc = c + ' + name)
            elif (i%10 != 0) or (i == 1): 
                f.write(name)
            else:
                f.write(name + ' \\' + '\n')
                f.write('            ')
        f.write('\n' + '    return val_myfunc' + '\n\n')
        f.write('if __name__ == \'__main__\':' + '\n')
        f.write('    val = ' + title + '_func(')
        f.writelines(my_features)
        f.write(')\n')
        f.write('    print(val)' + '\n')

######################################## 指定ファイルを複数一括削除
def my_del_files():
    for f1 in glob.glob('*.png'):
        os.remove(f1)
    for f2 in glob.glob('*interaction*'):
        os.remove(f2)

######################################## メイン関数
def main():    
    #check_flag, check_name, check_duplicates_columns = check_list
    for target_Yname in target_Yname_list:
        mytitle1 = 'none_interaction_ver'
        mytitle2 = 'interaction_ver'
        check_column = (check_flag, target_Yname, check_name)
        
        # データセットからデータ抽出
        X1, Y, my_feature_names1 = my_load_csv(path, check_column, check_duplicates_columns,
                                               start_row, input_columns, out_start_column,
                                               target_Yname, target_value2ln)

        # 交互作用項のデータを作成
        X2, my_feature_names2 = my_InteractionData(my_feature_names1, X1)

        # 説明変数Xの前処理
        if scalerjudge == 1: # 標準化する場合
            X1 = my_StandardScaler(X1)
            X2 = my_StandardScaler(X2)
        elif scalerjudge == 2: # 正規化する場合
            X1 = my_MinMaxScaler(X1)
            X2 = my_MinMaxScaler(X2)
        else:
            pass
        X1.describe().to_csv('dataX1_describe.csv', index=True, header=True, encoding='utf-8', mode='w')
        X2.describe().to_csv('dataX2_describe.csv', index=True, header=True, encoding='utf-8', mode='w')
        
        # 重回帰分析を実行
        pred_Y1, title1 = my_LinearRegression(X1, Y, target_Yname, title=mytitle1)
        pred_Y2, title2 = my_LinearRegression(X2, Y, target_Yname, title=mytitle2)
        # グラフ化
        plot_scatter(Xname_for_graph, X1, target_Yname, Y, pred_Y1, title1)
        plot_reg(X1, target_Yname, Y, pred_Y1, title1)
        plot_scatter(Xname_for_graph, X2, target_Yname, Y, pred_Y2, title2 )
        plot_reg(X2, target_Yname, Y, pred_Y2, title2 )
        
        # モジュールの作成
        create_func_in_module(target_Yname, my_feature_names1, title=mytitle1)
        create_func_in_module(target_Yname, my_feature_names1, title=mytitle2)

########################################
if __name__ == '__main__':
    ##### 基本パラメータ #####
    path = "boston.csv" # 読み込む生データcsvファイル
    input_factors = 13
    Xname_for_graph = 'RM'
    target_Yname_list = ['PRICE'] # 目的変数Y
    
    ### 生データcsvファイルの読み込み設定
    start_row = 1 # 読み込み開始行。1行目なら1, 3行目なら3, …
    target_value2ln = False # Yを対数変換するか?
    check_flag = True # True:NaNやNoneなどの欠損値を除外する場合。False:指定文字列を残す場合
    check_name = "ok" # 上記check_flagが「False」の場合に機能する。指定文字列を残す
    check_duplicates_columns=[] # ダブりを省く列名を指定。複数可。省かない場合は[]と空リストにする
    check_list = [check_flag, check_name, check_duplicates_columns]
    
    input_columns = (0, input_factors) # 設計因子の列番号(開始, 終了)
    out_start_column = 100 # この列以降のデータについて、欠損値の処理を行う
    scalerjudge = 0 # Xデータの前処理:生データそのままの場合は0、 標準化する場合は1、正規化する場合は2

    ##### 関数呼び出し #####
    ### 初期化(前回実行時に生成されたファイルを削除)
    my_del_files()
    main()    
    print("finished")   

保存した回帰モデルの使用方法は下記リンク先参照

hk29.hatenablog.jp

 作成した回帰式の実際の運用方法については、データの蓄積と更新を繰り返すことである。交互作用項を入れた重回帰分析もしくはランダムフォレストの活用が良いように思う。

以上

<広告>