Python Linuxのターミナルでコマンド実行した結果を読む方法

'20/03/02更新:コードの可読性の向上。
はじめに、下図はLinuxのターミナルで、コマンド $ ps aux を実行した例です。

f:id:HK29:20191024004531p:plain

本記事では、上記のようなコマンドラインで実行した結果がスタックであるようなデータを下図のような横棒グラフを作成して可視化る処理を目的とします。

f:id:HK29:20191024002920p:plain

本記事の特筆することは次の4つである。
■1. コマンドライン実行結果をファイルオブジェクトで扱うことで、メモリ上でデータ処理できる
→方法:ioをインポートして使用する。f = io.StringIO(runcmd.decode("utf-8"))
 この時、カッコ内でデコードする必要がある。

■2. 横棒グラフにデータラベルを載せる
→方法:plt.barh(range(len(df_x)), df_y, tick_label=df_x, align="center", color="magenta", height=0.8
第一、第二引数は「座標」を表す

■3. Pandasでファイル読み込み時に列数が途中で異なる(増える)ために生じるエラー
pandas.errors.ParserError: Error tokenizing data. C error:
→方法:ダミー列名を作成して既存列名の後ろに追記する。
 ダミーは内包表記で作成するのがわかりやすい。そして多めに設定する。例では55個追記。col_names2 = (["dummy{:02d}".format(i) for i in range(55)]) # dummy columns list

■4. スタックデータの列方向の数を数える
→方法:df5 = DF.groupby([check_column]).size()
 sizeの後ろのカッコ。付け忘れるとメソッド共通のエラーでファンクションがどうたらというエラーが出る。AttributeError: 'function' object has no attribute …

 ▼本プログラム
if __name__ == '__main__':
下にあるパラメータで、入力コマンドと着目する列名を設定できる。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os, glob
import io, subprocess
import pandas as pd
import numpy as np
#import matplotlib as mpl
#mpl.use('Agg')
import matplotlib.pyplot as plt

import datetime now = datetime.datetime.now() now = now.strftime("%y%m%d_%H%M%S") class Exe_cmd2csv: def __init__(self, my_cmd, my_header): self.cmd = my_cmd runcmd = subprocess.check_output(my_cmd.split()) f = io.StringIO(runcmd.decode("utf-8")) for i, e in enumerate(f): col_name = e if my_header == i: break col_names = col_name.split() # to list col_names2 = (["dummy{:02d}".format(i) for i in range(55)]) # dummy columns list col_names.extend(col_names2) # concat print('col_names', col_names) self.df = pd.read_csv(io.StringIO(runcmd.decode("utf-8")), skiprows=(my_header+1), names=col_names) cmd_list = my_cmd.split() fname = "" for e in cmd_list: fname += e self.my_file = fname + '.csv' self.df.to_csv(self.my_file, header=True, index=False) self.out_file = self.my_file[:-4] with open(self.my_file, "r") as f1: for i, row in enumerate(f1): buf = [] if i == my_header: buf = row.strip() with open(self.out_file + '2.csv', "a") as f2: f2.write(buf) elif i > my_header: buf = ','.join(row.split()) # string with open(self.out_file + '2.csv', "a") as f2: f2.write(buf + '\n') else:pass self.df2 = pd.read_csv(self.out_file + '2.csv', sep=',', header=0, encoding="utf_8") def doropna2csv(self, check_column): df3 = self.df2.dropna(subset=[check_column]) df3 = df3.dropna(axis=1) df3.to_csv(self.out_file + '3_doropna.csv', header=True, index=False) return df3 def dorop_dupli2csv(self, check_column): df4 = self.df2.drop_duplicates(check_column, keep='first') df4.to_csv(self.out_file + '4_drop_dupli.csv', header=True, index=False) return df4 def count_in_special_column(self, DF, check_column): df5 = DF.groupby([check_column]).size() print(df5) #Series df5 = df5.reset_index(name='count') print(df5) # DataFrame print("sort") df5 = df5.sort_values(by='count', ascending=False) df5.to_csv(self.out_file + '5_sort.csv', header=True, index=False) print(df5) # sort # sort for graph df5 = df5.sort_values(by='count', ascending=True) return df5 def xy_plot(self, DF, X, Y, range_X): print(DF) df_x = DF[X] df_y = DF[Y] y_np = np.array(df_y) plt.figure(figsize=(8,12)) plt.barh(range(len(df_x)), df_y, tick_label=df_x, align="center", color="magenta", height=0.8) for i, j in enumerate(y_np): plt.text(j, (i+0.5), str(int(j)), ha='left', va='top') plt.title(self.cmd) plt.xlabel('count', fontsize=12) if range_X: plt.xlim([range_X[0], range_X[1]]) plt.grid(which="major", axis="x", color="black", alpha=0.8, linestyle="-", linewidth=1) #plt.tight_layout() plt.show() pic_name = now + "_" + self.out_file + ".png" plt.savefig(pic_name) def my_del_files(): for f1 in glob.glob('*.png'): os.remove(f1) for f2 in glob.glob('*.csv'): os.remove(f2) def run(cmd, my_header, check_column, range_x=()): myinstance = Exe_cmd2csv(cmd, my_header) if range_x: dfA = myinstance.doropna2csv(check_column) dfB = myinstance.count_in_special_column(dfA, check_column) myinstance.xy_plot(dfB, check_column, 'count', range_x) else: dfC = myinstance.dorop_dupli2csv(check_column) print(dfC.iloc[:, 0:5]) def main(): ### initialize my_del_files() cmd = "ps aux" my_header = 0 check_column = "USER" range_x =(0, 100) # (min, max) run(cmd, my_header, check_column, range_x) cmd = "finger" my_header = 0 check_column = "Login" run(cmd, my_header, check_column) if __name__ == '__main__': main() print("finished")

以上

<広告>