10

以下に含まれるpythonでネストされたループを実行しています。これは、既存の金融時系列を検索し、特定の特性に一致する時系列の期間を探す基本的な方法として機能します。

この場合、「終値」(つまり、資産の価格) と「ボリューム」(つまり、期間中に交換された資産の金額) を表す、同じサイズの 2 つの別個の配列があります。期間ごとに、長さが 1 ~ の将来のすべての間隔を楽しみにしてINTERVAL_LENGTH、これらの間隔のいずれかに検索に一致する特性があるかどうかを確認します (この場合、終値の比率は 1.0001 より大きく、1.0001 より小さくなります)。 1.5 で合計ボリュームが 100 より大きい)。

私の理解では、NumPy を使用した場合の高速化の主な理由の 1 つは、配列全体を操作している限り、インタープリターが何かを評価するたびにオペランドを型チェックする必要がないことです (例: numpy_array * 2) 、しかし明らかに以下のコードはそれを利用していません。

内部ループをある種のウィンドウ関数に置き換えてスピードアップする方法、またはnumpy/scipyを使用してネイティブ python で大幅にスピードアップする方法はありますか?

あるいは、一般的にこれを行うためのより良い方法はありますか (たとえば、このループを C++ で記述して weave を使用する方がはるかに高速でしょうか)?

ARRAY_LENGTH = 500000
INTERVAL_LENGTH = 15
close = np.array( xrange(ARRAY_LENGTH) )
volume = np.array( xrange(ARRAY_LENGTH) )
close, volume = close.astype('float64'), volume.astype('float64')

results = []
for i in xrange(len(close) - INTERVAL_LENGTH):
    for j in xrange(i+1, i+INTERVAL_LENGTH):
        ret = close[j] / close[i]
        vol = sum( volume[i+1:j+1] )
        if ret > 1.0001 and ret < 1.5 and vol > 100:
            results.append( [i, j, ret, vol] )
print results
4

3 に答える 3

7

更新: 「new_function2」の下の (ほぼ) 完全にベクトル化されたバージョン...

少し説明するためにコメントを追加します。

〜50倍のスピードアップが得られ、出力がリストではなくnumpy配列であることに問題がなければ、さらに大きなスピードアップが可能です。そのまま:

In [86]: %timeit new_function2(close, volume, INTERVAL_LENGTH)
1 loops, best of 3: 1.15 s per loop

内部ループを np.cumsum() の呼び出しに置き換えることができます...以下の「new_function」関数を参照してください。これにより、かなりのスピードアップが得られます...

In [61]: %timeit new_function(close, volume, INTERVAL_LENGTH)
1 loops, best of 3: 15.7 s per loop

In [62]: %timeit old_function(close, volume, INTERVAL_LENGTH)
1 loops, best of 3: 53.1 s per loop

ただし、全体をベクトル化し、for ループを完全に回避することは可能であるはずです... ちょっと待ってください。

import numpy as np

ARRAY_LENGTH = 500000
INTERVAL_LENGTH = 15
close = np.arange(ARRAY_LENGTH, dtype=np.float)
volume = np.arange(ARRAY_LENGTH, dtype=np.float)

def old_function(close, volume, INTERVAL_LENGTH):
    results = []
    for i in xrange(len(close) - INTERVAL_LENGTH):
        for j in xrange(i+1, i+INTERVAL_LENGTH):
            ret = close[j] / close[i]
            vol = sum( volume[i+1:j+1] )
            if (ret > 1.0001) and (ret < 1.5) and (vol > 100):
                results.append( (i, j, ret, vol) )
    return results


def new_function(close, volume, INTERVAL_LENGTH):
    results = []
    for i in xrange(close.size - INTERVAL_LENGTH):
        vol = volume[i+1:i+INTERVAL_LENGTH].cumsum()
        ret = close[i+1:i+INTERVAL_LENGTH] / close[i]

        filter = (ret > 1.0001) & (ret < 1.5) & (vol > 100)
        j = np.arange(i+1, i+INTERVAL_LENGTH)[filter]

        tmp_results = zip(j.size * [i], j, ret[filter], vol[filter])
        results.extend(tmp_results)
    return results

def new_function2(close, volume, INTERVAL_LENGTH):
    vol, ret = [], []
    I, J = [], []
    for k in xrange(1, INTERVAL_LENGTH):
        start = k
        end = volume.size - INTERVAL_LENGTH + k
        vol.append(volume[start:end])
        ret.append(close[start:end])
        J.append(np.arange(start, end))
        I.append(np.arange(volume.size - INTERVAL_LENGTH))

    vol = np.vstack(vol)
    ret = np.vstack(ret)
    J = np.vstack(J)
    I = np.vstack(I)

    vol = vol.cumsum(axis=0)
    ret = ret / close[:-INTERVAL_LENGTH]

    filter = (ret > 1.0001) & (ret < 1.5) & (vol > 100)

    vol = vol[filter]
    ret = ret[filter]
    I = I[filter]
    J = J[filter]

    output = zip(I.flat,J.flat,ret.flat,vol.flat)
    return output

results = old_function(close, volume, INTERVAL_LENGTH)
results2 = new_function(close, volume, INTERVAL_LENGTH)
results3 = new_function(close, volume, INTERVAL_LENGTH)

# Using sets to compare, as the output 
# is in a different order than the original function
print set(results) == set(results2)
print set(results) == set(results3)
于 2010-06-18T02:05:59.787 に答える
3

sum高速化の 1 つは、この実装では長さ 2 から までのリストを合計するため、この部分を削除することINTERVAL_LENGTHです。代わりにvolume[j+1]、ループの最後の反復からの vol の前の結果に加算するだけです。したがって、リスト全体を合計して毎回スライスするのではなく、毎回 2 つの整数を追加するだけです。また、 を実行することから始めるのではなく、 を実行sum(volume[i+1:j+1])するだけです。これはvol = volume[i+1] + volume[j+1]、ここでの最初のケースが常に 2 つのインデックスになることがわかっているためです。

別の高速化は、python 実装の実行が大幅に高速化されているため、 の.extend代わりに使用することです。.appendextend

if必要に応じて特定の計算のみを行うように、最後のステートメントを分割することもできます。たとえば、if vol <= 100を計算する必要はありませんret

これはあなたの問題を正確に解決するものではありませんが、特に合計の問題では、これらの変更により大幅なスピードアップが見られるはずです。

len編集 -リストの長さはすでに明確にわかっているため、も必要ありません(それが単なる例でない限り)。ではなく数値として定義する方len(something)が常に高速です。

編集 - 実装 (これはテストされていません):

ARRAY_LENGTH = 500000
INTERVAL_LENGTH = 15
close = np.array( xrange(ARRAY_LENGTH) )
volume = np.array( xrange(ARRAY_LENGTH) )
close, volume = close.astype('float64'), volume.astype('float64')

results = []
ex = results.extend
for i in xrange(ARRAY_LENGTH - INTERVAL_LENGTH):
    vol = volume[i+1]
    for j in xrange(i+1, i+INTERVAL_LENGTH):
        vol += volume[j+1]
        if vol > 100:
            ret = close[j] / close[i]
            if 1.0001 < ret < 1.5:
                ex( [i, j, ret, vol] )
print results
于 2010-06-17T21:22:51.120 に答える
1

次のような単一のリストとして結果を生成しようとしないのはなぜですか (追加または拡張よりもはるかに高速です)。

results = [ t for t in ( (i, j, close[j]/close[i], sum(volume[i+1:j+1]))
                         for i in xrange(len(close)-INT_LEN)
                             for j in xrange(i+1, i+INT_LEN)
                       )
            if t[3] > 100 and 1.0001 < t[2] < 1.5
          ]
于 2010-06-18T04:32:46.767 に答える