Python 多目的最適化「Platypus」のスクリプトをアルゴリズム別に作成する

# '20/02/16更新:パレート解に判断基準線を加えることができるようにした。
 多目的最適化のフレームワーク「Platypus」については以前に述べた→Python 多目的最適化「Platypus」:自作モジュールの回帰モデルを読む場合 - HK29’s blog

 本記事では、これを複数のアルゴリズム別に結果を出力するような仕組みで作成した。最適化の過程をcsvファイルで出力して、また行列散布図としても保存する。また、縦横軸を目的関数にパレート解も出力する。

f:id:HK29:20200118183949p:plain

得られる結果例を先に示す。例えば、下図はアルゴリズム「IBEA」による多目的最適化の過程で各世代で得られた結果を全数プロットした行列散布図である。更にその下には他のアルゴリズム「NSGAⅡ」,「SPEA2」の結果である。マジマジと比較すると、探索過程が異なっている様子がわかる。

f:id:HK29:20200118183602p:plain

下図はアルゴリズム「NSGAⅡ」による探索結果である。

f:id:HK29:20200118183618p:plain

下図はアルゴリズム「SPEA2」による探索結果である。

f:id:HK29:20200118183633p:plain

下図のようにパレート解も出力する。指定したアルゴリズム分を画像ファイルだけでなく、設計変数と共にcsvファイルに保存する仕様です。

f:id:HK29:20200216043847p:plain

##### 以降に、本記事にあるコードを実行する手順について述べる。

まず、準備するのは、例えば次の5つのファイルを作成する。

f:id:HK29:20200118181321p:plain

■作業1. 回帰モデル(式)を作成する

多目的なので2つ以上の目的関数を作成する。
「Pulse_LinearRegression_interaction_ver_module.py」,「Waist_LinearRegression_interaction_ver_module.py」,「Weight_LinearRegression_interaction_ver_module.py」は次の2つのリンク先を参考に作成する。

Python scikit-learn付属の多目的用のデータ「Linnerud dataset」をcsvファイル化する - HK29’s blog

Python scikit-learnによる交互作用の項を考慮した重回帰分析 - HK29’s blog

下図はそのファイルモジュールの一例です。説明変数を引数に、目的変数を返す関数になっています。

f:id:HK29:20200118182002p:plain

作業2. 多目的最適化スクリプトの元になるファイルを作成する

下記コードを例えば「zzz_original.txt」として保存する。コード中にある@マークで囲まれた箇所を更にその下で示す本コードを実行することで、アルゴリズム別に設定条件を置換して、アルゴリズム別に実行ファイル「.py」を作成する仕様です。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# https://platypus.readthedocs.io/en/latest/experimenter.html#comparing-algorithms-visually

@MY_01_IMPORT_MODULE@

from platypus import *
import pandas as pd
import numpy as np
import math

import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import seaborn as sns
import japanize_matplotlib

# 目的関数を作成して返す関数:自作モジュールを読み出す場合
def my_function_from_module(vars):
@MY_02_VARS@

@MY_03_OBJECTIVE_FUNCTION@

@MY_04_RETURN@

# パレート解を図示する関数
def plot_scatter(algorithm, out_file_name, my_color):
    my_file_name = out_file_name + '_' + my_algorithm_name
@MY_05_XY_FOR_GRAPH@

    fig = plt.figure(figsize=(8,8)) #dpi=120)
    ax = fig.add_subplot(1,1,1)
    ax.set_title('多目的最適化 ' + my_algorithm_name, fontsize=14)

    ax.set_xlabel(my_xlabel, fontsize=16)
    ax.set_ylabel(my_ylabel, fontsize=16)
    if my_xrange:
        ax.set_xlim([my_xrange[0], my_xrange[1]])
    if my_yrange:    
        ax.set_ylim([my_yrange[0], my_yrange[1]])
    if my_xjugdeline and my_yrange:
        ax.vlines(x=my_xjugdeline, ymin=my_yrange[0], ymax=my_yrange[1],
                  colors='r', linewidths=1, linestyles='dashed')
    if my_yjugdeline and my_xrange:
        ax.hlines(y=my_yjugdeline, xmin=my_xrange[0], xmax=my_xrange[1],
                  colors='r', linewidths=1, linestyles='dashed')
    ax.scatter(x_list, y_list, facecolors='none', edgecolors=my_color, label="パレート解")
    ax.legend(loc='best', fontsize=14)
    ax.grid(which='both')
    ax.tick_params(labelsize=14)
    plt.tight_layout()
    plt.savefig(my_file_name + '_Pareto_frontier.png')
    plt.close()

# 行列散布図を作成する関数
def plot_matrix_scatter(label, DF, my_sequential_colormap):
    print("plot_matrix_scatter ...")
    sns.set(style="ticks", font_scale=1.2, palette=my_sequential_colormap, color_codes=True, font="IPAexGothic")
    g = sns.pairplot(DF, diag_kind="hist")
    g.fig.suptitle(label)
    g.fig.subplots_adjust(top=0.9)
    plt.grid(True)
    plt.savefig(label + "_Matrix_scatter.png")
    plt.close()

# 最適化の過程をcsvファイルに出力する関数(この中で行列散布図の作成する関数も呼び出す)
#def create_Summary_data(Chins,
#                        Situps,
#                        Jumps,
@MY_06_CREATE_SUMMARY_FUNCTION@
                        algorithm, my_sequential_colormap):
    my_file_name = out_file_name + '_' + my_algorithm_name 

@MY_07_COLUMN_NAME_LIST@
    column_name_list = ','.join(column_name_list)
    row_list=[]
    with open(my_file_name + '.csv', 'w') as f:
        f.writelines(column_name_list)
        f.write('\n')
        for i in range(len(algorithm.result)):
@MY_08_ROW_LIST_APPEND@
@MY_09_OUT_INVERSE_TRANSFORMATION@
            print(my_algorithm_name + ' epoch' +str(i+1) + ' -> ' + str(row_list))
            row_list_str = ','.join(map(str, row_list))
            f.writelines(row_list_str)
            f.write('\n')
            row_list=[]
    df = pd.read_csv(my_file_name + '.csv')
    plt.figure(figsize=(12,12))
    plot_matrix_scatter('多目的最適化' + my_algorithm_name, df, my_sequential_colormap)
    plt.close()

# メイン関数:platypusによる多目的最適化
def main():
@MY_10_PROBLEM_SET@
@MY_11_PROBLEM_DIRECTION@

@MY_12_INPUT_FACTOR_RANGE@

@MY_13_PROBLEM_TYPE@
@MY_14_PROBLEM_FUNCTION@

@MY_15_ALGORITHM@
    algorithm.run(10000) # evaluation number
    print(algorithm)
    plot_scatter(algorithm, out_file_name, 'r')
@MY_16_CALL_CREATE_SUMMARY_FUNCTION@

# パラメータの設定
if __name__ == '__main__':
    # parameter
    out_file_name = 'linnerud'
    my_xlabel = '体重' #'Weigth'
    my_xscale = 'linear'
    my_xrange = (0, 700) # グラフのレンジを設定。空タプル()でAuto
    my_xjugdeline = 300
    my_ylabel = '腹囲' #'Waist'
    my_yscale = 'linear'
    my_yrange = (0, 100) # グラフのレンジを設定。(最小値, 最大値)
    my_yjugdeline = 60
@MY_17_ALGORITHM_NAME@

    #my_colors = ["r", "g", "b", "c", "m", "y", "k", "w"]
    my_sequential_colormaps = ["spring", "summer", "autumn", "winter", 'bone',
                            "copper", "plasma", "magma", "cividis", "hsv"]
        
    # call function
@MY_18_CALL_MAIN@
    
    print('finished')

■作業3. 本プログラムを作成する

最後に、下記コードを例えば「zzz_create_platypus.py」として保存して、実行すれば良い。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# https://platypus.readthedocs.io/en/latest/getting-started.html#defining-unconstrained-problems
import os, glob, subprocess
import datetime, time
now = datetime.datetime.now()
now = now.strftime("%y%m%d")

def zero_indent_loop_func(my_list):
    buf = []
    for i, my_element in enumerate(my_list):
        buf.append(my_element + '\n')
        
    return buf

def vars_func(my_02_input_factors_list):
    buf = []
    for i, my_factor in enumerate(my_02_input_factors_list):
        buf.append('    ' + my_factor + ' = vars[' + str(i) + ']' + '\n')
    
    return buf

def four_indent_loop_func(my_list):
    buf = []
    for i, my_element in enumerate(my_list):
        buf.append('    ' + my_element + '\n')
        
    return buf

def object_function_return_func(my_04_objective_function_return_list):
    buf = []
    for i, my_objective_function_return in enumerate(my_04_objective_function_return_list):
        if i == 0:
            buf.append('    return [' + my_objective_function_return + ',' + '\n')
        elif i == (len(my_04_objective_function_return_list)-1):
            buf.append('            ' + my_objective_function_return + ']' + '\n')
        else:
            buf.append('            ' + my_objective_function_return + ',' + '\n')
    
    return buf

def column_xy_func(my_07_column_xy_list):
    buf = []
    my_column_xy_list_str = '","'.join(my_07_column_xy_list)
    my_column_xy_list_str = '["' + my_column_xy_list_str + '"]'
    buf.append('    column_name_list = ' + str(my_column_xy_list_str) + '\n')
    
    return buf

def row_list_append_for_output_func(my_02_input_factors_list):
    buf = []
    for i, my_input_factor in enumerate(my_02_input_factors_list):
        buf.append('            row_list.append(' + my_input_factor + '.decode(algorithm.result[i].variables[' + str(i) + ']))' + '\n')
    buf.append('            row_list.extend(algorithm.result[i].objectives[:])' + '\n')

    return buf

def twelve_indent_loop_func(my_list):
    buf = []
    for i, my_element in enumerate(my_list):
        buf.append('            ' + my_element + '\n')
        
    return buf

def create_file(replace_set, replace_list):
    target_file, new_file, target_word = replace_set # アンパック代入
    tmp_list=[]
    flag=0
    with open(replace_set[0], "r", encoding='utf-8') as f1:
        k=0
        for row in f1:
            if row.find(replace_set[2]) != -1:
                flag = k
                #print('flag -> ' + str(flag))
                tmp_list.append(replace_list)
                #print(tmp_list[flag])
            else:
                tmp_list.append(row)
            k=k+1
    if replace_set[0]==replace_set[1]: # 入力ファイル名と出力ファイル名を同じに指定した場合。上書きエラーを回避
        os.remove(replace_set[0])
            
    with open(replace_set[1], "w", encoding='utf-8') as f2:
        for i in range(len(tmp_list)):
            if i != flag:
                # 文字列を書く場合
                f2.write(str(tmp_list[i]))
            else:
                # リストの各要素を全て書き出す場合。ちなみに、要素毎に改行する場合は各要素に改行コードを入れる必要あり。
                f2.writelines(tmp_list[i])

# メイン関数:platypusフレームワークによる多目的最適化
def main():
    cnt=0
    for my_algorithm_name, my_algorithm_setting, in algorithm_dict.items():
        new_filename = now + '_platypus_' + my_algorithm_name + '.py'
        print(new_filename)
        replace_set_list = []
        for i, replace_target in enumerate(replace_target_list):
            if i==0:
                my_tuple = (original_filename, new_filename, replace_target)
                replace_set_list.append(my_tuple)
            else:
                my_tuple = (new_filename, new_filename, replace_target)
                replace_set_list.append(my_tuple)
        my_01_import_module_list = ["import sys",
                                    "sys.path.append('.')",
                                    "import Weight_LinearRegression_interaction_ver_module as myfunc1",
                                    "import Waist_LinearRegression_interaction_ver_module as myfunc2",
                                    "import Pulse_LinearRegression_interaction_ver_module as myfunc3"]
        #my_01_import_module_list = ["from sklearn.linear_model import LinearRegression"]
        ###my_01_import_module_list = ["from sklearn.ensemble import RandomForestRegressor"]
        ###my_01_import_module_list = ["from sklearn import neural_network"]
        
        my_02_input_factors_list = ["Chins",
                                    "Situps",
                                    "Jumps"]
        
        my_03_objective_function_list = ["objective_function1 = (-1) * myfunc1.interaction_ver_func(Chins, Situps, Jumps)",
                                         "objective_function2 = myfunc2.interaction_ver_func(Chins, Situps, Jumps)",
                                         "objective_function3 = myfunc3.interaction_ver_func(Chins, Situps, Jumps)"]

        '''
        my_03_objective_function_list = ["X_np = np.array([[Chins, Situps, Jumps]])",
                                         "X_df = pd.DataFrame(X_np, columns = ['Chins', 'Situps', 'Jumps'])",
                                         "pred_Y1 = regr1.predict(X_df)",
                                         "pred_Y2 = regr1.predict(X_df)",
                                         "pred_Y3 = regr1.predict(X_df)",
                                         "objective_function3 = myfunc3.interaction_ver_func(Chins, Situps, Jumps)"]
        '''
        
        my_04_objective_function_return_list = ["objective_function1",
                                                "objective_function2",
                                                "objective_function3"]
        
        '''
        my_04_objective_function_return_list = ["pred_Y1[0]",
                                                "pred_Y2[0]",
                                                "pred_Y3[0]"]
        '''

        
        my_05_xy_for_graph_list = ["x_list = [(-1) * s.objectives[0] for s in algorithm.result]",
                                   "y_list = [s.objectives[1] for s in algorithm.result]"]
        
        my_06_create_summary_function_list = ["def create_Summary_data(Chins, Situps, Jumps,"]
        
        my_multi_purpose_list = ["Weight",
                                 "Waist",
                                 "Pulse"]
        
        my_07_column_xy_list = my_02_input_factors_list + my_multi_purpose_list
        
        my_08_row_list_append_list = my_02_input_factors_list
        
        my_09_out_inverse_transformation_list = ["row_list[-3] = row_list[-3]",
                                                 "row_list[-2] = row_list[-2]",
                                                 "row_list[-1] = (-1) * row_list[-1]"]
        
        my_10_problem_set_list = ["problem = Problem(3, 3)"]
        
        my_11_problem_directions_list = ["problem.directions[:] = [Problem.MINIMIZE, Problem.MINIMIZE, Problem.MINIMIZE]"]
        
        my_12_input_factors_range_list = ["Chins = Integer(1, 17)",
                                          "Situps = Integer(50, 250)",
                                          "Jumps = Integer(25, 250)"]

        my_13_problem_set_list = ["problem.types[:] = [Chins, Situps, Jumps]"]

        my_14_problem_function_list = ["problem.function = my_function_from_module"]
        
        my_15_problem_algorithm_list = ["algorithm = " + my_algorithm_setting]
        
        my_16_create_summary_list = ["create_Summary_data(Chins, Situps, Jumps, algorithm, my_sequential_colormaps[" + str(cnt) + "])"]
        
        my_17_algorithm_name_list = ["my_algorithm_name = '" + my_algorithm_name + "'"]
        
        my_18_main_list = ["main()"]
               
        replace_sentence_jag_list = []
        replace_sentence_jag_list.append(zero_indent_loop_func(my_01_import_module_list))        
        replace_sentence_jag_list.append(vars_func(my_02_input_factors_list))
        replace_sentence_jag_list.append(four_indent_loop_func(my_03_objective_function_list))
        replace_sentence_jag_list.append(object_function_return_func(my_04_objective_function_return_list))        
        replace_sentence_jag_list.append(four_indent_loop_func(my_05_xy_for_graph_list))
        replace_sentence_jag_list.append(zero_indent_loop_func(my_06_create_summary_function_list))
        replace_sentence_jag_list.append(column_xy_func(my_07_column_xy_list))
        replace_sentence_jag_list.append(row_list_append_for_output_func(my_08_row_list_append_list))        
        replace_sentence_jag_list.append(twelve_indent_loop_func(my_09_out_inverse_transformation_list))
        replace_sentence_jag_list.append(four_indent_loop_func(my_10_problem_set_list))
        replace_sentence_jag_list.append(four_indent_loop_func(my_11_problem_directions_list))
        replace_sentence_jag_list.append(four_indent_loop_func(my_12_input_factors_range_list))
        replace_sentence_jag_list.append(four_indent_loop_func(my_13_problem_set_list))        
        replace_sentence_jag_list.append(four_indent_loop_func(my_14_problem_function_list))
        replace_sentence_jag_list.append(four_indent_loop_func(my_15_problem_algorithm_list))
        replace_sentence_jag_list.append(four_indent_loop_func(my_16_create_summary_list))        
        replace_sentence_jag_list.append(four_indent_loop_func(my_17_algorithm_name_list))
        replace_sentence_jag_list.append(four_indent_loop_func(my_18_main_list))
        
        for replace_set_list, replace_sentence_list in zip(replace_set_list, replace_sentence_jag_list):
            create_file(replace_set_list, replace_sentence_list)
        cnt += 1

    pyfiles_list = glob.glob(now + '_platypus*')
    for n, pyfile in enumerate(pyfiles_list):
        print(str(n+1) + "/" + str(len(pyfiles_list)))
        print("python " + pyfile + ' ...')
        start_time = time.time()
        my_cmd = "python " + pyfile
        
        runcmd = subprocess.call(my_cmd.split()) # 正常に実行されれば0を返す
        '''
        p = subprocess.Popen(my_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        for line in iter(p.stdout.readline, b''):
            print(line.rstrip().decode("utf-8"))
        '''
        analysis_time = time.time() - start_time
        print('time -> ' + "{0:.1f}".format(analysis_time) + 's')
        if runcmd == 0:
            print('analysis ok')
        else:
            print('something wrong')
        
# パラメータの設定
if __name__ == '__main__':
    # parameter
    original_filename = 'zzz_original.txt'
    my_population_size = 200
    algorithm_dict = {'NSGAII':'NSGAII(problem, population_size=' + str(my_population_size) + ")",
                      'NSGAIII':'NSGAIII(problem, 12)',
                      'IBEA':'IBEA(problem, population_size=' + str(my_population_size) + ")",
                      'SPEA2':'SPEA2(problem, population_size=' + str(my_population_size) + ")",
                      }
    '''
                      'MOEAD':'MOEAD(problem, weight_generator=normal_boundary_weights, divisions_outer=12)',
                      'PAES':'PAES(problem)',
                      'EpsMOEA':'EpsMOEA(problem, 0.05, population_size=' + str(my_population_size) + ")"}
    '''
    
    replace_target_list = ['@MY_01_IMPORT_MODULE@',
                           '@MY_02_VARS@',
                           '@MY_03_OBJECTIVE_FUNCTION@',
                           '@MY_04_RETURN@',
                           '@MY_05_XY_FOR_GRAPH@',
                           '@MY_06_CREATE_SUMMARY_FUNCTION@',
                           '@MY_07_COLUMN_NAME_LIST@',
                           '@MY_08_ROW_LIST_APPEND@',
                           '@MY_09_OUT_INVERSE_TRANSFORMATION@',
                           '@MY_10_PROBLEM_SET@',
                           '@MY_11_PROBLEM_DIRECTION@',
                           '@MY_12_INPUT_FACTOR_RANGE@',
                           '@MY_13_PROBLEM_TYPE@',
                           '@MY_14_PROBLEM_FUNCTION@',
                           '@MY_15_ALGORITHM@',
                           '@MY_16_CALL_CREATE_SUMMARY_FUNCTION@',
                           '@MY_17_ALGORITHM_NAME@',
                           '@MY_18_CALL_MAIN@']
    
    main()
    
    print('finished')

 (備考)
設計変数と目的変数の記述場所をまとめておくことで、比較的再利用し易くした。設計変数や目的関数を変更したい場合は、my_01_*** ~my_18_*** を編集すれば良い。

以上

<広告>