Python Keras(TensorFlow2.0)によるリカレントニューラルネットワーク,RNN

 リカレントニューラルネットワーク,RNN(Recurrent Neural Network)は、日本語で再帰ニューラルネットワークと呼び、時系列データのパターン認識、例えば、自然言語応答処理、株価予測などに活用されます。

本記事では、株価予測を例題に雛形コードを載せてます。本家のチュートリアルhttps://www.tensorflow.org/tutorials/structured_data/time_seriesを元にモデルの設定やグラフの見栄え、ハイパーパラメータを株価用に適度に調整しています。

 RNNには拡張版がいくつかあり、中でもLSTM(Long Short Term Memory )が主流で長期的な学習に幾分向いているとされます。簡単には、過去データを活用して学習するにあたり、他のNNのようにドロップアウト過学習を抑制するのではなく、tanhとhard_sigmoid関数を挟むことで適度に学習を進展させます。これにより、単なるRNNのようにすぐに学習しなくなることを防止する試みです。

 本記事では5日移動平均のデータを用いて、20日分を予測するコード例です。移動平均の計算は例えば、Python 株価データの欠損値をその前後の値で補完後、単純移動平均を算出する「pandas」 - PythonとVBAで世の中を便利にするで株価のcsvファイルを生成します。

 下図にある移動平均5日間のデータを活用した例を示します。本コードではこれに限定せず、objective_feature_index でいずれかを指定できます。

f:id:HK29:20200517205606p:plain

下図は、単変量解析の過程です。横時は学習の世代で、縦軸は損失関数とした平均絶対誤差です。右にゆく程小さい値で学習してる様子がわかります。

f:id:HK29:20200517205944p:plain

 下図はバリデーション結果のひとつです。赤×は実データで緑●は予測結果です。絶対値は兎も角、株価が上昇することは的中しました。

f:id:HK29:20200517210845p:plain

下図は、バリデーションサンプル2つ目です。これも上昇することは的中してます。

f:id:HK29:20200517211053p:plain

下図は3つ目の例です。実データの赤×はほぼ株価が変動してないのに対して、予測結果●緑は500円程上昇しており、予測は外れています。

f:id:HK29:20200517210910p:plain

次に、下図は多変量解析の結果です。これも右へゆくほど、学習してる様子がわかります。

f:id:HK29:20200517210937p:plain

 下図は、バリデーションデータによる結果のひとつです。青点は実データで、赤点が予測結果です。20日分の多変量予測です。株価が上昇する様子が的中しています。

f:id:HK29:20200517211140p:plain

下図は、バリデーションサンプル2つ目です。これも上昇することは的中してます。

f:id:HK29:20200517211247p:plain

下図は、3つ目の例です。青点は下がっているのに対して、赤点は上昇していてます。つまり、予測結果は外しました。

f:id:HK29:20200517211324p:plain

 やってみればわかりますが、ニューラルネットワークは、データ点数が多く必要なだけでなく、データ分析する範囲や、データの前処理、ハイパーパラメータの影響が高いです。非線形問題を解く可能性がある手法であるだけで、それを活用するのは簡単ではないです。

■本プログラム

#!/usr/bin/env python3
# coding: utf-8
# "https://www.tensorflow.org/tutorials/structured_data/time_series"
# https://www.tensorflow.org/api_docs/python/tf/keras/layers/BatchNormalization
import tensorflow as tf
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import datetime
now = datetime.datetime.now()
now = now.strftime("%y%m%d_")

mpl.rcParams['figure.figsize'] = (9, 6)
mpl.rcParams.update({'font.size': 14})
mpl.rcParams['axes.grid'] = True

def multivariate_data(dataset,
                      target,
                      start_index,
                      end_index,
                      history_size,
                      target_size,
                      step,
                      single_step=False):
    data = []
    labels = []
    targets = []

    start_index = start_index + history_size
    if end_index is None:
        end_index = len(dataset) - target_size

    for i in range(start_index, end_index):
        indices = range(i-history_size, i, step)
        data.append(dataset[indices])
        
        if single_step:
            labels.append(target[i+target_size])
            targets.append(target_list[i+target_size])
        else:
            labels.append(target[i:i+target_size])
            targets.append(target_list[i:i+target_size])

    return np.array(data), np.array(labels)

### 学習過程をグラフ化
def plot_train_history(history, title):
    loss = history.history['loss']
    val_loss = history.history['val_loss']

    epochs = [i for i in range(1, len(loss)+1)]

    plt.figure()
    plt.plot(epochs, loss, 'b', label='Training loss')
    plt.plot(epochs, val_loss, 'r', label='Validation loss')
    plt.ylim([0,1])
    plt.xlabel('epochs')
    plt.ylabel(myloss)
    plt.title(title)
    plt.legend()
    plt.tight_layout()
    plt.savefig(title + '.png', dpi=300)
    plt.close()

def create_time_steps(length):
    return list(range(-length, 0))

### 折れ線グラフ化(シングルステップ)
def single_step_plot(plot_data,
                     delta,
                     title,
                     i):
    labels = ['History', 'True Future', 'Model Prediction']
    marker = ['.-', 'rx', 'go']
    time_steps = create_time_steps(plot_data[0].shape[0])
    
    if delta:
      future = delta
    else:
      future = 0

    plt.title(title + ': sample ' + str(i))
    for i, x in enumerate(plot_data):
        if i:
            plot_data_y = plot_data[i] * data_std[objective_feature_index] + data_mean[objective_feature_index]
            plt.plot(future, plot_data_y, marker[i], markersize=10,
                     label=labels[i])
        else:
            plot_data_y = plot_data[i] * data_std[objective_feature_index] + data_mean[objective_feature_index]
            plt.plot(time_steps, plot_data_y.flatten(), marker[i], label=labels[i])
    plt.legend()
    plt.xlim([time_steps[0], (future+5)])
    plt.xlabel('Time-Step')
    plt.ylabel(features_considered[objective_feature_index])
    plt.tight_layout()

    return plt

### 折れ線グラフ化(マルチステップ)
def multi_step_plot(history, true_future, prediction, STEP, i):
    #plt.figure(figsize=(12, 6))
    num_in = create_time_steps(len(history))
    num_out = len(true_future)

    # 過去データ
    plot_data_y = np.array(history[:, objective_feature_index]) * data_std[objective_feature_index] + data_mean[objective_feature_index]
    plt.plot(num_in, plot_data_y, label='History')
    # 実際に起こったデータ
    plot_data_y = np.array(true_future) * data_std[objective_feature_index] + data_mean[objective_feature_index]
    plt.plot(np.arange(num_out)/STEP, plot_data_y, 'bo',
             label='True Future')
    # 予測データ
    if prediction.any():
        plot_data_y = np.array(prediction) * data_std[objective_feature_index] + data_mean[objective_feature_index]
        plt.plot(np.arange(num_out)/STEP, plot_data_y, 'ro',
                 label='Predicted Future')
    plt.xlabel('Time-Step')
    plt.ylabel(features_considered[objective_feature_index])
    plt.legend(loc='upper left')
    plt.title('Multi Step Prediction: sample ' + str(i))
    plt.tight_layout()
    plt.savefig('multi_step_plot_' + str(i) + '.png', dpi=300)
    plt.close()

### シングルステップモデル。未来を予測する
def single_step_model_func(dataset):
    print('run -> single_step_model_func')
    # 訓練データ
    x_train_single, y_train_single = multivariate_data(dataset,
                                                       dataset[:, objective_feature_index],
                                                       0,
                                                       TRAIN_SPLIT,
                                                       past_history,
                                                       future_target,
                                                       STEP,
                                                       single_step=True)
    # バリデーションデータ
    x_val_single, y_val_single = multivariate_data(    dataset,
                                                       dataset[:, objective_feature_index],
                                                       TRAIN_SPLIT,
                                                       None,
                                                       past_history,
                                                       future_target,
                                                       STEP,
                                                       single_step=True)
    print ('Single window of past history : {}'.format(x_train_single[0].shape))

    train_data_single = tf.data.Dataset.from_tensor_slices((x_train_single, y_train_single))
    train_data_single = train_data_single.cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE).repeat()

    val_data_single = tf.data.Dataset.from_tensor_slices((x_val_single, y_val_single))
    val_data_single = val_data_single.batch(BATCH_SIZE).repeat()

    # モデルの作成
    single_step_model = tf.keras.models.Sequential()
    single_step_model.add(tf.keras.layers.LSTM(32,
                                               input_shape=x_train_single.shape[-2:]))
    single_step_model.add(tf.keras.layers.Dense(1))
    single_step_model.compile(optimizer = myoptimizer,
                              loss = myloss,
                              metrics = ['accuracy'])

    # 学習過程をプリント表示
    for x, y in val_data_single.take(1):
        print(single_step_model.predict(x).shape)

    single_step_history = single_step_model.fit(train_data_single,
                                                epochs = EPOCHS,
                                                steps_per_epoch = EVALUATION_INTERVAL,
                                                validation_data = val_data_single,
                                                validation_steps = VAL_STEPS)
    # 学習結果をグラフで表示
    plot_train_history(single_step_history, 'Single Step Training and validation loss')
    i = 1
    for x, y in val_data_single.take(5):
        single_step_plot([x[0][:, objective_feature_index].numpy(), y[0].numpy(), single_step_model.predict(x)[0]],
                         12,
                         'Single Step Prediction',
                         i)
        plt.savefig('single_step_plot_' + str(i) + '.png', dpi=300)
        plt.close()
        i += 1

    # 学習器を保存
    single_step_model.save(now + 'single_step_model.h5')
    print(single_step_model.summary())

### マルチステップモデル。未来を多変量(複数点範囲)で予測する
def multi_step_model_func(dataset):
    print('run -> multi_step_model_func')
    x_train_multi, y_train_multi = multivariate_data(dataset,
                                                     dataset[:, objective_feature_index],
                                                     0,
                                                     TRAIN_SPLIT,
                                                     past_history,
                                                     future_target,
                                                     STEP)

    x_val_multi, y_val_multi = multivariate_data(    dataset,
                                                     dataset[:, objective_feature_index],
                                                     TRAIN_SPLIT,
                                                     None,
                                                     past_history,
                                                     future_target,
                                                     STEP)
    print ('Single window of past history : {}'.format(x_train_multi[0].shape))
    print ('\n Target to predict : {}'.format(y_train_multi[0].shape))

    train_data_multi = tf.data.Dataset.from_tensor_slices((x_train_multi, y_train_multi))
    train_data_multi = train_data_multi.cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE).repeat()

    val_data_multi = tf.data.Dataset.from_tensor_slices((x_val_multi, y_val_multi))
    val_data_multi = val_data_multi.batch(BATCH_SIZE).repeat()

    # モデルの作成
    multi_step_model = tf.keras.models.Sequential()
    multi_step_model.add(tf.keras.layers.LSTM(units = 32,
                                              activation='tanh',
                                              recurrent_activation='hard_sigmoid',
                                              use_bias=True,
                                              kernel_initializer='glorot_uniform',
                                              recurrent_initializer='orthogonal',
                                              bias_initializer='zeros',
                                              dropout=0.0,
                                              recurrent_dropout=0.0,
                                              implementation=2,
                                              return_sequences=True, # 完全な系列を返すか
                                              kernel_regularizer=tf.keras.regularizers.l2(weight_decay),
                                              #kernel_regularizer=tf.keras.regularizers.l1_l2(l1=weight_decay, l2=weight_decay),
                                              input_shape=x_train_multi.shape[-2:]))
    multi_step_model.add(tf.keras.layers.LSTM(16, activation='relu'))
    multi_step_model.add(tf.keras.layers.Dense(future_target))
    multi_step_model.compile(optimizer = myoptimizer,
                             loss = myloss,
                             metrics = ['accuracy'])

    # 学習過程をプリント表示
    for x, y in val_data_multi.take(1):
        print (multi_step_model.predict(x).shape)

    multi_step_history = multi_step_model.fit(train_data_multi,
                                              epochs = EPOCHS,
                                              steps_per_epoch = EVALUATION_INTERVAL,
                                              validation_data = val_data_multi,
                                              validation_steps = VAL_STEPS)
    # 学習結果をグラフで表示
    plot_train_history(multi_step_history, 'Multi-Step Training and validation loss')
    i = 1
    for x, y in val_data_multi.take(5):
        multi_step_plot(x[0], y[0], multi_step_model.predict(x)[0], STEP, i)
        i += 1
        
    # 学習器を保存
    multi_step_model.save(now + 'multi_step_model.h5')
    print(multi_step_model.summary())

### メイン関数
def main():
    global target_list, data_mean, data_std

    tf.random.set_seed(7) # 乱数シードを指定(固定)

    features = df[features_considered] # 多変量(複数の特徴量)を元に解析する。リストで設定する
    target_list = list(df[target]) # インデックスをリスト化
    features.index = df[target] # 横軸にする時系列データをPandasデータフレームでインデックスに設定
    print(features.head()) # 解析するデータの最初の5行を目視確認
    features.plot(subplots=False, grid=True) # グラフで確認
    plt.savefig('features_plot.png', dpi=300)
    plt.close()

    dataset = features.values # 説明変数データをnumpy配列で抽出
    data_mean = dataset[:TRAIN_SPLIT].mean(axis=0) # 平均値
    data_std = dataset[:TRAIN_SPLIT].std(axis=0) # 標準偏差
    dataset = (dataset-data_mean)/data_std # データを標準化

    ### シングルテップモデル。未来の値ひとつを予測する
    single_step_model_func(dataset)

    ### マルチステップモデル。未来の値の範囲を予測する
    multi_step_model_func(dataset)


### パラメータの設定
if __name__ == '__main__':
    # テンソルフローのバージョンを表示
    print('tensorflow ver', tf.__version__)
    
    # 読み出すcsvファイル
    csv_path = 'N225_10years_04_done.csv' 
    # Pandasデータフレーム形式で読む
    df = pd.read_csv(csv_path)
    # 訓練データに使用する全体データの割合
    train_ratio = 0.80
    # 訓練データ数を計算
    TRAIN_SPLIT = int(df.shape[0] * train_ratio)
    print('TRAIN_SPLIT:', TRAIN_SPLIT)

    # 時系列の列名
    target = 'Date'
    # 複数列データから解析する場合
    features_considered = ['Close', 'moving_average_5', 'moving_average_25', 'moving_average_75'] 
    # 予測する目的変数のインデックスを指定
    objective_feature_index = 1

    # 予測する未来の数
    future_target = 20
    # 過去参照するデータ数:直近のこのデータ数分を元に、未来を予測する
    past_history = future_target * 3
    # データ行をスキップする数。スキップしない場合は1(データをスキップせず使用する)
    STEP = 1

    # バッチサイズ:一度に解析するデータの塊。
    BATCH_SIZE = past_history
    print('BATCH_SIZE:', BATCH_SIZE)
    # 指定したバッファ(値)をシャッフルして得た番号で、バッチサイズ分を訓練データから抽出して解析する
    BUFFER_SIZE = int(TRAIN_SPLIT * train_ratio)
    
    #各世代の試行回数:「訓練データ÷バッチサイズ」の商以下が目安
    EVALUATION_INTERVAL = TRAIN_SPLIT // BATCH_SIZE
    print('EVALUATION_INTERVAL:', EVALUATION_INTERVAL)
    #バリデーションの試行回数:「バリデーションデータ÷バッチサイズ」の商以下が目安
    VAL_STEPS = (TRAIN_SPLIT*(1-train_ratio)/train_ratio) // BATCH_SIZE
    print('VAL_STEPS:', VAL_STEPS)
    # 学習する世代回数
    EPOCHS = 10

    # オプティマイザの設定
    myoptimizer = tf.keras.optimizers.Adam(lr=0.001, # 学習率
                                           beta_1=0.9,
                                           beta_2=0.999,
                                           epsilon=None,
                                           decay=0.0,
                                           amsgrad=False)
    # 指標とする損失関数
    myloss='mean_absolute_error' # mean_squared_error  mean_absolute_error
    # 重みの正則化
    weight_decay = 1e-3

    # メイン関数を実行
    main()
    print('finished')

以上

<広告>