Python 高速フーリエ変換(FFT)による周波数解析「SciPy」

 高速フーリエ変換FFT:Fast Fourier Transform)は、離散フーリエ変換を高速に解くアルゴリズムのことです。

 本記事では、合成波から個々の周波数を抽出する雛形コードです。以前、記事にしたPython ねこふんじゃったを演奏する「PyAudio」 - HK29’s blogで作成した「じゃ」の音は、「シ:246.941」と「ソ:391.995」の2つの和音です。これをフーリエ変換によって、周波数を抽出する方法を例としました。

■以降、本プログラムで出来ること(実行した結果)を示します。
下図中の上は、上記リンク先で出力した和音のcsvファイルを読み込んだ結果です。周期的に合成波が繰り返されていることがわかります。下図中の下は、そのデータに窓関数を掛けた結果です。両端がゼロに収束するように変換した様子がわかります。この処理は、周波数解析する場合に、分析データ区間の開始と終了を切り出しによる影響を極力排除するための処置です。

f:id:HK29:20200314183554p:plain

下図は、窓関数を掛けずにフーリエ変換した場合の結果です。下図下の赤線は、下図上の黒線の横軸のレンジを絞っただけです。強度を表す縦軸のピーク位置が250Hz付近と400Hz付近にあることがわかります。これは、冒頭で述べた「シ:246.941」と「ソ:391.995」の周波数を解析できたことを意味します。

f:id:HK29:20200314184224p:plain

下図は窓関数を掛けて、フーリエ変換した結果です。これも、ほぼ正確に解析できています。注目すべきは、縦軸の強度が0.2と約半分に減少しています。窓関数を掛けて、両端の振幅が減少したことによるものです。

f:id:HK29:20200314184237p:plain

次に、元のcsvファイルの音波データから、適当に範囲を絞ってcsvファイルを作成して分析しました。その生データが下図中の上で、周期に無関係な中途半端な箇所で両端が切れてる様子がわかります。そのデータに窓関数を掛けたのが、下図中の下です。中央データはそのままに、両端データはゼロに収束している様子がわかります。

f:id:HK29:20200314184848p:plain

下図は、上図中の上のデータをフーリエ変換した結果です。一応、「シ:246.941」と「ソ:391.995」は解析できていますが、先の結果よりも値のずれが10程大きいように見えます。

f:id:HK29:20200314185339p:plain

下図は、窓関数を掛けたデータをフーリエ変換した結果です。これも一応分析できていますが、強度も約半分になっています。このような場合、強度が様々なデータをフーリエ変換による周波数解析をしたい場合は、識別が困難になってくることが想像できます。

f:id:HK29:20200314185430p:plain

■本プログラム

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os, glob
import numpy as np
import pandas as pd
from scipy import signal
from scipy import fftpack
#import matplotlib
#matplotlib.use('Agg')
import matplotlib.pyplot as plt

### ファイルを読み込んでnumpyのnd array型とデータ数(行数)を返す
def load_file(file_path):
    # numpyアレイ型で読み込み
    data_np = np.loadtxt(file_path, # ファイルパス
                      delimiter=",", # 区切り文字
                      skiprows=0, # 読み込み開始行
                      usecols=(0) # 読み込む列番号。複数の場合の例:(0,1,2)
                      )

    return data_np

### 窓関数を掛ける
def windowing(file_name, curve_data_np):
    N = curve_data_np.shape[0]

    #window_func = signal.hann(N)
    #window_func = signal.hamming(N)
    window_func = signal.blackman(N)

    curve_data_np_after_windowing = curve_data_np * window_func

    # グラフ化
    plt.subplot(211)
    plt.plot(curve_data_np,label="raw curve", c='k')
    plt.legend(loc=1)
    plt.title(file_name)

    plt.subplot(212)
    plt.plot(curve_data_np_after_windowing,label="after windowing", c='b')
    plt.legend(loc=1)
    plt.xlabel("time")

    plt.savefig(file_name + '_00_original_vs_windowing.png') 
    plt.close()
    
    # numpy型をcsvファイルに出力
    '''
    np.savetxt(file_name + '_windowing.csv',
               curve_data_np_after_windowing,
               delimiter=',',
               fmt='%.18e')
    '''

    return curve_data_np_after_windowing, window_func

### 高速フーリエ変換(FFT:Fast Fourier Transform)
def fft(file_name, curve_data_np):
    N = curve_data_np.shape[0]

    freq = fftpack.fftfreq(n=N, d=1/fs)
    yf = fftpack.fft(curve_data_np)/(N/2)
    yf_abs = np.abs(yf) # 虚数部を含めた強さ
    
    # グラフ化
    plt.subplot(211)
    # 対称性より半分のみプロット
    plt.plot(freq[:N//2], yf_abs[:N//2], label="FFT:Fast Fourier Transform", c='k')
    plt.legend(loc=1)
    plt.title(file_name)

    # numpyのnd Array型のX,Yデータ列の結合
    FFT = np.c_[freq, yf_abs]
    # pandasのデータフレーム型へ変換
    df = pd.DataFrame(FFT, columns=['x', 'y'])
    extract_df = df.query('0 < x <= 500') # 周波数0~500Hz区間のデータを抽出

    # numpyのnd array型へ戻す(変換)
    extract_FFT = extract_df.values.T # .Tは転置の意
    #np.savetxt(file_name + '_out.tsv', extract_FFT, delimiter='\t')

    plt.subplot(212)    
    plt.plot(extract_FFT[0], extract_FFT[1], label="Enlarged view", c='r')
    plt.legend(loc=1)
    plt.xlabel("frequency [Hz]")

    plt.savefig(file_name + '_fft.png') 
    plt.close()

    return yf_abs

### メイン関数
def main():
    for file in file_list:
        file_name = os.path.basename(file)[:-4]
        
        # csvファイルの波形データをnumpy型で取得
        curve_data_np = load_file(file)
        # フーリエ変換
        FFT = fft(file_name, curve_data_np)

        # 窓関数を掛けたデータを作成
        curve_data_np_after_windowing, window_func = windowing(file_name, curve_data_np) 
        # フーリエ変換
        FFT= fft(file_name + '_with_window', curve_data_np_after_windowing)

if __name__ == '__main__':
    file_list = glob.glob('./*.csv')
    fs = 44100 # 基本サンプリングレート

    main()
    print('finished')
    

以上

<広告>