12

現在のプロジェクトでは、2 つの 3 次元配列を少し変わった方法で"畳み込む" 必要があります。

次元が dimA と dimB の 2 つの 3 次元配列 A と B があるとします (すべての軸で同じ)。ここで、すべての軸の次元が dimA+dimB である 3 番目の配列 C を作成します。

C のエントリは次のように計算されます。

c_{x1+x2,y1+y2,z1+z2} += a_{x1,y1,z1} * b_{x2,y2,z2}

私の現在のバージョンは簡単です:

dimA = A.shape[0]
dimB = B.shape[0]
dimC = dimA+dimB

C = np.zeros((dimC,dimC,dimC))
for x1 in range(dimA):
    for x2 in range(dimB):
        for y1 in range(dimA):
            for y2 in range(dimB):
                for z1 in range(dimA):
                    for z2 in range(dimB):
                        x = x1+x2
                        y = y1+y2
                        z = z1+z2
                        C[x,y,z] += A[x1,y1,z1] * B[x2,y2,z2] 

残念ながら、このバージョンは非常に遅く、使用できません。

私の2番目のバージョンは次のとおりです。

C = scipy.signal.fftconvolve(A,B,mode="full")

しかし、これは要素のみを計算しますmax(dimA,dimB)

誰かがより良いアイデアを持っていますか?

4

3 に答える 3

5

Numbaを使ってみましたか? 通常は遅い Python コードを JIT コンパイラでラップできるようにするパッケージです。Numba を使用して問題を簡単に突き止めたところ、速度が大幅に向上しました。IPython のマジックtimeitマジック関数を使用すると、このcustom_convolution関数は約 18 秒かかりましたが、Numba の最適化された関数は 10.4 ミリ秒かかりました。これは 1700 を超える高速化です。

Numba の実装方法は次のとおりです。

import numpy as np
from numba import jit, double

s = 15
array_a = np.random.rand(s ** 3).reshape(s, s, s)
array_b = np.random.rand(s ** 3).reshape(s, s, s)

# Original code
def custom_convolution(A, B):

    dimA = A.shape[0]
    dimB = B.shape[0]
    dimC = dimA + dimB

    C = np.zeros((dimC, dimC, dimC))
    for x1 in range(dimA):
        for x2 in range(dimB):
            for y1 in range(dimA):
                for y2 in range(dimB):
                    for z1 in range(dimA):
                        for z2 in range(dimB):
                            x = x1 + x2
                            y = y1 + y2
                            z = z1 + z2
                            C[x, y, z] += A[x1, y1, z1] * B[x2, y2, z2]
    return C

# Numba'ing the function with the JIT compiler
fast_convolution = jit(double[:, :, :](double[:, :, :],
                        double[:, :, :]))(custom_convolution)

両方の関数の結果の間の残差を計算すると、ゼロが得られます。これは、JIT 実装が問題なく機能していることを意味します。

slow_result = custom_convolution(array_a, array_b) 
fast_result = fast_convolution(array_a, array_b)

print np.max(np.abs(slow_result - fast_result))

これで得られる出力は0.0.

現在の Python セットアップに Numba をインストールするか、continuum.io のAnacondaCEパッケージを使用してすばやく試すことができます。

最後になりましたが、Numba の関数は関数よりscipy.signal.fftconvolve数倍高速です。

注: 私は AnacondaCE ではなく Anaconda を使用しています。2 つのパッケージの Numba のパフォーマンスには多少の違いがありますが、それほど大きくは変わらないと思います。

于 2013-02-25T12:39:54.537 に答える
4

一般的なルールは、ジョブに正しいアルゴリズムを使用することです。これは、畳み込みカーネルがデータに比べて短い場合を除き、FFT ベースの畳み込みです (短いとは、おおよそ log2(n) 未満を意味します。ここで、n はデータの長さです)。 .

あなたの場合、2 つの等しいサイズのデータ​​セットを畳み込んでいるので、おそらく FFT ベースの畳み込みを検討する必要があります。

明らかに、scipy.signal.fftconvolveこの点でタッチが不足しています。より高速な FFT アルゴリズムを使用すると、独自の畳み込みルーチンをローリングすることで、はるかに優れた結果を得ることができます (fftconvolve が変換サイズを 2 の累乗に強制することは役に立ちません。そうしないと、モンキー パッチが適用される可能性があります)。

次のコードは、 FFTWのラッパーであるpyfftwを使用し、カスタム畳み込みクラスを作成します。CustomFFTConvolution

class CustomFFTConvolution(object):

    def __init__(self, A, B, threads=1):

        shape = (np.array(A.shape) + np.array(B.shape))-1

        if np.iscomplexobj(A) and np.iscomplexobj(B):
            self.fft_A_obj = pyfftw.builders.fftn(
                    A, s=shape, threads=threads)
            self.fft_B_obj = pyfftw.builders.fftn(
                    B, s=shape, threads=threads)
            self.ifft_obj = pyfftw.builders.ifftn(
                    self.fft_A_obj.get_output_array(), s=shape,
                    threads=threads)

        else:
            self.fft_A_obj = pyfftw.builders.rfftn(
                    A, s=shape, threads=threads)
            self.fft_B_obj = pyfftw.builders.rfftn(
                    B, s=shape, threads=threads)
            self.ifft_obj = pyfftw.builders.irfftn(
                    self.fft_A_obj.get_output_array(), s=shape,
                    threads=threads)

    def __call__(self, A, B):

        fft_padded_A = self.fft_A_obj(A)
        fft_padded_B = self.fft_B_obj(B)

        return self.ifft_obj(fft_padded_A * fft_padded_B)

これは次のように使用されます。

custom_fft_conv = CustomFFTConvolution(A, B)
C = custom_fft_conv(A, B) # This can contain different values to during construction

threadsクラスを構築するときにオプションの引数を使用します。クラスを作成する目的は、事前に変換を計画する FFTW の機能を活用することです。

以下の完全なデモ コードは、タイミングなどに対する @Kelsey の回答を単純に拡張したものです。

スピードアップは、numba ソリューションとバニラの fftconvolve ソリューションの両方でかなりのものです。n = 33 の場合、両方よりも約 40 ~ 45 倍高速です。

from timeit import Timer
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import fftconvolve
from numba import jit, double
import pyfftw

# Original code
def custom_convolution(A, B):

    dimA = A.shape[0]
    dimB = B.shape[0]
    dimC = dimA + dimB

    C = np.zeros((dimC, dimC, dimC))
    for x1 in range(dimA):
        for x2 in range(dimB):
            for y1 in range(dimA):
                for y2 in range(dimB):
                    for z1 in range(dimA):
                        for z2 in range(dimB):
                            x = x1 + x2
                            y = y1 + y2
                            z = z1 + z2
                            C[x, y, z] += A[x1, y1, z1] * B[x2, y2, z2]
    return C

# Numba'ing the function with the JIT compiler
numba_convolution = jit(double[:, :, :](double[:, :, :],
                        double[:, :, :]))(custom_convolution)

def fft_convolution(A, B):
    return fftconvolve(A, B, mode='full')

class CustomFFTConvolution(object):

    def __init__(self, A, B, threads=1):

        shape = (np.array(A.shape) + np.array(B.shape))-1

        if np.iscomplexobj(A) and np.iscomplexobj(B):
            self.fft_A_obj = pyfftw.builders.fftn(
                    A, s=shape, threads=threads)
            self.fft_B_obj = pyfftw.builders.fftn(
                    B, s=shape, threads=threads)
            self.ifft_obj = pyfftw.builders.ifftn(
                    self.fft_A_obj.get_output_array(), s=shape,
                    threads=threads)

        else:
            self.fft_A_obj = pyfftw.builders.rfftn(
                    A, s=shape, threads=threads)
            self.fft_B_obj = pyfftw.builders.rfftn(
                    B, s=shape, threads=threads)
            self.ifft_obj = pyfftw.builders.irfftn(
                    self.fft_A_obj.get_output_array(), s=shape,
                    threads=threads)

    def __call__(self, A, B):

        fft_padded_A = self.fft_A_obj(A)
        fft_padded_B = self.fft_B_obj(B)

        return self.ifft_obj(fft_padded_A * fft_padded_B)

def run_test():
    reps = 10
    nt, ft, cft, cft2 = [], [], [], []
    x = range(2, 34)

    for N in x:
        print N
        A = np.random.rand(N, N, N)
        B = np.random.rand(N, N, N)

        custom_fft_conv = CustomFFTConvolution(A, B)
        custom_fft_conv_nthreads = CustomFFTConvolution(A, B, threads=2)

        C1 = numba_convolution(A, B)
        C2 = fft_convolution(A, B)
        C3 = custom_fft_conv(A, B)
        C4 = custom_fft_conv_nthreads(A, B)

        assert np.allclose(C1[:-1, :-1, :-1], C2)
        assert np.allclose(C1[:-1, :-1, :-1], C3)
        assert np.allclose(C1[:-1, :-1, :-1], C4)

        t = Timer(lambda: numba_convolution(A, B))
        nt.append(t.timeit(number=reps))
        t = Timer(lambda: fft_convolution(A, B))
        ft.append(t.timeit(number=reps))
        t = Timer(lambda: custom_fft_conv(A, B))
        cft.append(t.timeit(number=reps))
        t = Timer(lambda: custom_fft_conv_nthreads(A, B))
        cft2.append(t.timeit(number=reps))

    plt.plot(x, ft, label='scipy.signal.fftconvolve')
    plt.plot(x, nt, label='custom numba convolve')
    plt.plot(x, cft, label='custom pyfftw convolve')
    plt.plot(x, cft2, label='custom pyfftw convolve with threading')        
    plt.legend()
    plt.show()

if __name__ == '__main__':
    run_test()

編集: 最近の scipy は、常に 2 の累乗の長さにパディングするわけではないため、出力が pyFFTW ケースに近くなります。

于 2013-03-01T19:45:33.853 に答える
3

上記のNumbaメソッドは巧妙なトリックですが、Nが比較的小さい場合にのみ利点があります.O (N^6)アルゴリズムは、実装の速さに関係なく、毎回あなたを殺します. 私のテストでは、fftconvolveメソッドは N=20 付近でクロスオーバーし、N=32 までは 10 倍速くなります。custom_convolution上記の定義を除外します。

from timeit import Timer
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import fftconvolve
from numba import jit, double

# Numba'ing the function with the JIT compiler
numba_convolution = jit(double[:, :, :](double[:, :, :],
                        double[:, :, :]))(custom_convolution)

def fft_convolution(A, B):
    return fftconvolve(A, B, mode='full')

if __name__ == '__main__':
    reps = 3
    nt, ft = [], []
    x = range(2, 34)
    for N in x:
        print N
        A = np.random.rand(N, N, N)
        B = np.random.rand(N, N, N)
        C1 = numba_convolution(A, B)
        C2 = fft_convolution(A, B)
        assert np.allclose(C1[:-1, :-1, :-1], C2)
        t = Timer(lambda: numba_convolution(A, B))
        nt.append(t.timeit(number=reps))
        t = Timer(lambda: fft_convolution(A, B))
        ft.append(t.timeit(number=reps))
    plt.plot(x, ft, x, nt)
    plt.show()

また、FFT は 2 の累乗ではるかに高速になるため、N にも大きく依存します。FFT バージョンの時間は、N=17 から N=32 まで基本的に一定であり、N=33 ではさらに高速です。再び急速に発散し始めます。

Numba で FFT 実装をラップすることはできますが、scipy バージョンで直接行うことはできません。

(新しい回答を作成して申し訳ありませんが、直接返信するポイントがありません。)

于 2013-02-25T20:06:54.637 に答える