特定のウィンドウを指定して 1D 配列の移動平均を計算する Python 用の SciPy 関数または NumPy 関数またはモジュールはありますか?
30 に答える
更新:より効率的なソリューションが提案されていますuniform_filter1d
。scipy
おそらく「標準」のサードパーティ ライブラリの中で最高のものであり、いくつかの新しいライブラリや特殊なライブラリも利用できます。
そのために使用できますnp.convolve
:
np.convolve(x, np.ones(N)/N, mode='valid')
説明
移動平均は、畳み込みの数学的操作の場合です。移動平均の場合、ウィンドウを入力に沿ってスライドさせ、ウィンドウの内容の平均を計算します。離散 1D 信号の場合、畳み込みは同じことですが、平均の代わりに任意の線形結合を計算します。つまり、各要素に対応する係数を掛けて結果を加算します。ウィンドウ内の各位置に 1 つずつあるこれらの係数は、畳み込みカーネルと呼ばれることがあります。N 値の算術平均は(x_1 + x_2 + ... + x_N) / N
であるため、対応するカーネルは(1/N, 1/N, ..., 1/N)
であり、それはまさに を使用して得られるものnp.ones(N)/N
です。
エッジ
のmode
引数はnp.convolve
、エッジの処理方法を指定します。ここでモードを選択したのは、valid
ほとんどの人がランニング平均が機能することを期待していると思うからですが、他の優先事項があるかもしれません。モード間の違いを示すプロットは次のとおりです。
import numpy as np
import matplotlib.pyplot as plt
modes = ['full', 'same', 'valid']
for m in modes:
plt.plot(np.convolve(np.ones(200), np.ones(50)/50, mode=m));
plt.axis([-10, 251, -.1, 1.1]);
plt.legend(modes, loc='lower center');
plt.show()
効率的なソリューション
畳み込みは単純なアプローチよりもはるかに優れていますが、(私が推測するに) FFT を使用するため、非常に低速です。ただし、特に実行平均を計算する場合は、次のアプローチがうまく機能します
def running_mean(x, N):
cumsum = numpy.cumsum(numpy.insert(x, 0, 0))
return (cumsum[N:] - cumsum[:-N]) / float(N)
チェックするコード
In[3]: x = numpy.random.random(100000)
In[4]: N = 1000
In[5]: %timeit result1 = numpy.convolve(x, numpy.ones((N,))/N, mode='valid')
10 loops, best of 3: 41.4 ms per loop
In[6]: %timeit result2 = running_mean(x, N)
1000 loops, best of 3: 1.04 ms per loop
numpy.allclose(result1, result2)
であることに注意してくださいTrue
。2 つの方法は同等です。N が大きいほど、時間の差が大きくなります。
警告: cumsum の方が高速ですが、浮動小数点エラーが増加し、結果が無効/不正確/許容できないものになる可能性があります
コメントは、ここでこの浮動小数点エラーの問題を指摘しましたが、ここで回答でより明確にしています。.
# demonstrate loss of precision with only 100,000 points
np.random.seed(42)
x = np.random.randn(100000)+1e6
y1 = running_mean_convolve(x, 10)
y2 = running_mean_cumsum(x, 10)
assert np.allclose(y1, y2, rtol=1e-12, atol=0)
- 蓄積するポイントが多いほど、浮動小数点エラーが大きくなります (したがって、1e5 ポイントは顕著であり、1e6 ポイントは 1e6 よりも重要であり、アキュムレータをリセットする必要がある場合があります)。
- を使用してごまかすことができ
np.longdouble
ますが、浮動小数点エラーは比較的多数のポイントに対して依然として重要になります(約> 1e5ですが、データによって異なります) - エラーをプロットすると、比較的急速に増加することがわかります
- 畳み込みソリューションは遅くなりますが、この浮動小数点の精度の低下はありません
- uniform_filter1d ソリューションは、この cumsum ソリューションよりも高速であり、この浮動小数点の精度の低下はありません
更新:pandas.rolling_mean
以下の例は、最近のバージョンの pandas で削除された古い関数を示しています。その関数呼び出しに相当する現代的なものは、 pandas.Series.rollingを使用します:
In [8]: pd.Series(x).rolling(window=N).mean().iloc[N-1:].values
Out[8]:
array([ 0.49815397, 0.49844183, 0.49840518, ..., 0.49488191,
0.49456679, 0.49427121])
これには、NumPy や SciPy よりもpandasの方が適しています。その関数rolling_meanは便利に仕事をします。また、入力が配列の場合は NumPy 配列を返します。
rolling_mean
カスタムの純粋な Python 実装でパフォーマンスを向上させることは困難です。以下は、提案された 2 つのソリューションに対するパフォーマンスの例です。
In [1]: import numpy as np
In [2]: import pandas as pd
In [3]: def running_mean(x, N):
...: cumsum = np.cumsum(np.insert(x, 0, 0))
...: return (cumsum[N:] - cumsum[:-N]) / N
...:
In [4]: x = np.random.random(100000)
In [5]: N = 1000
In [6]: %timeit np.convolve(x, np.ones((N,))/N, mode='valid')
10 loops, best of 3: 172 ms per loop
In [7]: %timeit running_mean(x, N)
100 loops, best of 3: 6.72 ms per loop
In [8]: %timeit pd.rolling_mean(x, N)[N-1:]
100 loops, best of 3: 4.74 ms per loop
In [9]: np.allclose(pd.rolling_mean(x, N)[N-1:], running_mean(x, N))
Out[9]: True
エッジ値を処理する方法に関する優れたオプションもあります。
次の方法で移動平均を計算できます。
import numpy as np
def runningMean(x, N):
y = np.zeros((len(x),))
for ctr in range(len(x)):
y[ctr] = np.sum(x[ctr:(ctr+N)])
return y/N
しかし、それは遅いです。
幸いなことに、numpy には、処理を高速化するために使用できるconvolve関数が含まれています。移動平均は、すべてのメンバーが に等しい長いx
ベクトルで畳み込むことと同じです。convolve の numpy 実装には開始トランジェントが含まれているため、最初の N-1 ポイントを削除する必要があります。N
1/N
def runningMeanFast(x, N):
return np.convolve(x, np.ones((N,))/N)[(N-1):]
私のマシンでは、入力ベクトルの長さと平均化ウィンドウのサイズにもよりますが、高速バージョンは 20 ~ 30 倍高速です。
convolve には'same'
、最初の一時的な問題に対処する必要があるように見えるモードが含まれていますが、最初と最後に分割されていることに注意してください。
または計算するpythonのモジュール
Tradewave.net での私のテストでは、TA-lib が常に勝ちます:
import talib as ta
import numpy as np
import pandas as pd
import scipy
from scipy import signal
import time as t
PAIR = info.primary_pair
PERIOD = 30
def initialize():
storage.reset()
storage.elapsed = storage.get('elapsed', [0,0,0,0,0,0])
def cumsum_sma(array, period):
ret = np.cumsum(array, dtype=float)
ret[period:] = ret[period:] - ret[:-period]
return ret[period - 1:] / period
def pandas_sma(array, period):
return pd.rolling_mean(array, period)
def api_sma(array, period):
# this method is native to Tradewave and does NOT return an array
return (data[PAIR].ma(PERIOD))
def talib_sma(array, period):
return ta.MA(array, period)
def convolve_sma(array, period):
return np.convolve(array, np.ones((period,))/period, mode='valid')
def fftconvolve_sma(array, period):
return scipy.signal.fftconvolve(
array, np.ones((period,))/period, mode='valid')
def tick():
close = data[PAIR].warmup_period('close')
t1 = t.time()
sma_api = api_sma(close, PERIOD)
t2 = t.time()
sma_cumsum = cumsum_sma(close, PERIOD)
t3 = t.time()
sma_pandas = pandas_sma(close, PERIOD)
t4 = t.time()
sma_talib = talib_sma(close, PERIOD)
t5 = t.time()
sma_convolve = convolve_sma(close, PERIOD)
t6 = t.time()
sma_fftconvolve = fftconvolve_sma(close, PERIOD)
t7 = t.time()
storage.elapsed[-1] = storage.elapsed[-1] + t2-t1
storage.elapsed[-2] = storage.elapsed[-2] + t3-t2
storage.elapsed[-3] = storage.elapsed[-3] + t4-t3
storage.elapsed[-4] = storage.elapsed[-4] + t5-t4
storage.elapsed[-5] = storage.elapsed[-5] + t6-t5
storage.elapsed[-6] = storage.elapsed[-6] + t7-t6
plot('sma_api', sma_api)
plot('sma_cumsum', sma_cumsum[-5])
plot('sma_pandas', sma_pandas[-10])
plot('sma_talib', sma_talib[-15])
plot('sma_convolve', sma_convolve[-20])
plot('sma_fftconvolve', sma_fftconvolve[-25])
def stop():
log('ticks....: %s' % info.max_ticks)
log('api......: %.5f' % storage.elapsed[-1])
log('cumsum...: %.5f' % storage.elapsed[-2])
log('pandas...: %.5f' % storage.elapsed[-3])
log('talib....: %.5f' % storage.elapsed[-4])
log('convolve.: %.5f' % storage.elapsed[-5])
log('fft......: %.5f' % storage.elapsed[-6])
結果:
[2015-01-31 23:00:00] ticks....: 744
[2015-01-31 23:00:00] api......: 0.16445
[2015-01-31 23:00:00] cumsum...: 0.03189
[2015-01-31 23:00:00] pandas...: 0.03677
[2015-01-31 23:00:00] talib....: 0.00700 # <<< Winner!
[2015-01-31 23:00:00] convolve.: 0.04871
[2015-01-31 23:00:00] fft......: 0.22306
すぐに使用できるソリューションについては、https://scipy-cookbook.readthedocs.io/items/SignalSmooth.htmlを参照してください。flat
ウィンドウタイプの移動平均を提供します。これは、データを反映することでデータの最初と最後の問題を処理しようとするため、単純な自作の畳み込み方法よりも少し洗練されていることに注意してください (これはあなたのケースではうまくいかないかもしれません. ..)。
まず、次を試すことができます。
a = np.random.random(100)
plt.plot(a)
b = smooth(a, window='flat')
plt.plot(b)
パーティーに少し遅れましたが、平均を見つけるために使用されるゼロで端またはパッドをラップしない独自の小さな関数を作成しました。さらに、線形間隔のポイントで信号を再サンプリングすることもできます。コードを自由にカスタマイズして、他の機能を取得します。
この方法は、正規化されたガウス カーネルを使用した単純な行列乗算です。
def running_mean(y_in, x_in, N_out=101, sigma=1):
'''
Returns running mean as a Bell-curve weighted average at evenly spaced
points. Does NOT wrap signal around, or pad with zeros.
Arguments:
y_in -- y values, the values to be smoothed and re-sampled
x_in -- x values for array
Keyword arguments:
N_out -- NoOf elements in resampled array.
sigma -- 'Width' of Bell-curve in units of param x .
'''
import numpy as np
N_in = len(y_in)
# Gaussian kernel
x_out = np.linspace(np.min(x_in), np.max(x_in), N_out)
x_in_mesh, x_out_mesh = np.meshgrid(x_in, x_out)
gauss_kernel = np.exp(-np.square(x_in_mesh - x_out_mesh) / (2 * sigma**2))
# Normalize kernel, such that the sum is one along axis 1
normalization = np.tile(np.reshape(np.sum(gauss_kernel, axis=1), (N_out, 1)), (1, N_in))
gauss_kernel_normalized = gauss_kernel / normalization
# Perform running average as a linear operation
y_out = gauss_kernel_normalized @ y_in
return y_out, x_out
移動平均の計算については、上記の多くの回答があります。私の答えは、2 つの追加機能を追加します。
- nan 値を無視します
- 対象の値自体を含まない N 個の隣接値の平均を計算します
この 2 番目の機能は、どの値が一般的な傾向と一定量異なるかを判断するのに特に役立ちます。
私は numpy.cumsum を使用します。これは、最も時間効率の良い方法であるためです (上記の Alleo の回答を参照してください)。
N=10 # number of points to test on each side of point of interest, best if even
padded_x = np.insert(np.insert( np.insert(x, len(x), np.empty(int(N/2))*np.nan), 0, np.empty(int(N/2))*np.nan ),0,0)
n_nan = np.cumsum(np.isnan(padded_x))
cumsum = np.nancumsum(padded_x)
window_sum = cumsum[N+1:] - cumsum[:-(N+1)] - x # subtract value of interest from sum of all values within window
window_n_nan = n_nan[N+1:] - n_nan[:-(N+1)] - np.isnan(x)
window_n_values = (N - window_n_nan)
movavg = (window_sum) / (window_n_values)
このコードは、偶数 N に対してのみ機能します。padded_x と n_nan の np.insert を変更することで、奇数に合わせて調整できます。
このコードは、カットオフ = 3 個の非 nan 値よりも少ない数から計算されたすべての移動平均値を削除するように簡単に適用できます。
window_n_values = (N - window_n_nan).astype(float) # dtype must be float to set some values to nan
cutoff = 3
window_n_values[window_n_values<cutoff] = np.nan
movavg = (window_sum) / (window_n_values)
この質問は、NeXuS が先月書いたときよりもさらに古いものですが、彼のコードがエッジ ケースをどのように扱っているかが気に入っています。ただし、これは「単純移動平均」であるため、適用されるデータよりも結果が遅れます。valid
NumPy のモード、same
、よりも満足のいく方法でエッジ ケースを処理するには、ベース メソッドfull
に同様のアプローチを適用することで実現できると考えました。convolution()
私のコントリビューションでは、中央移動平均を使用して、その結果をデータと一致させています。フルサイズのウィンドウを使用するには使用できるポイントが少なすぎる場合、移動平均は、配列の端にある連続して小さいウィンドウから計算されます。[実際には、ウィンドウが次々と大きくなりますが、それは実装の詳細です。]
import numpy as np
def running_mean(l, N):
# Also works for the(strictly invalid) cases when N is even.
if (N//2)*2 == N:
N = N - 1
front = np.zeros(N//2)
back = np.zeros(N//2)
for i in range(1, (N//2)*2, 2):
front[i//2] = np.convolve(l[:i], np.ones((i,))/i, mode = 'valid')
for i in range(1, (N//2)*2, 2):
back[i//2] = np.convolve(l[-i:], np.ones((i,))/i, mode = 'valid')
return np.concatenate([front, np.convolve(l, np.ones((N,))/N, mode = 'valid'), back[::-1]])
を使用しているため比較的遅くconvolve()
、真の Pythonista によって大幅に改善される可能性がありますが、その考えは正しいと思います。