5

ushort arrayA の各値を、同じ長さの ushort arrayB の対応するインデックス値からすばやく減算する必要があります。

さらに、差が負の場合は、負の差ではなくゼロを格納する必要があります。

(同じサイズの別の画像から 640x512 の画像を差し引いているため、正確には長さ = 327680 です)。

以下のコードは現在 20 ミリ秒ほどかかっていますが、できれば 5 ミリ秒以下に抑えたいと考えています。安全でないコードは問題ありませんが、私は安全でないコードを書くのが得意ではないので、例を挙げてください。

ありがとうございました!

public ushort[] Buffer { get; set; }

public void SubtractBackgroundFromBuffer(ushort[] backgroundBuffer)
{
    System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
    sw.Start();

    int bufferLength = Buffer.Length;

    for (int index = 0; index < bufferLength; index++)
    {
        int difference = Buffer[index] - backgroundBuffer[index];

        if (difference >= 0)
            Buffer[index] = (ushort)difference;
        else
            Buffer[index] = 0;
    }

    Debug.WriteLine("SubtractBackgroundFromBuffer(ms): " + sw.Elapsed.TotalMilliseconds.ToString("N2"));
}

更新:厳密には C# ではありませんが、これを読んでいる他の人の利益のために、最終的に次のコードを使用してソリューションに C++ CLR クラス ライブラリを追加することになりました。約 3.1ms で実行されます。アンマネージ C++ ライブラリが使用されている場合、約 2.2 ミリ秒で実行されます。時差が小さかったので、管理されたライブラリを使用することにしました。

// SpeedCode.h
#pragma once
using namespace System;

namespace SpeedCode
{
    public ref class SpeedClass
    {
        public:
            static void SpeedSubtractBackgroundFromBuffer(array<UInt16> ^ buffer, array<UInt16> ^ backgroundBuffer, int bufferLength);
    };
}

// SpeedCode.cpp
// This is the main DLL file.
#include "stdafx.h"
#include "SpeedCode.h"

namespace SpeedCode
{
    void SpeedClass::SpeedSubtractBackgroundFromBuffer(array<UInt16> ^ buffer, array<UInt16> ^ backgroundBuffer, int bufferLength)
    {
        for (int index = 0; index < bufferLength; index++)
        {
            buffer[index] = (UInt16)((buffer[index] - backgroundBuffer[index]) * (buffer[index] > backgroundBuffer[index]));
        }
    }
}

次に、次のように呼び出します。

    public void SubtractBackgroundFromBuffer(ushort[] backgroundBuffer)
    {
        System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
        sw.Start();

        SpeedCode.SpeedClass.SpeedSubtractBackgroundFromBuffer(Buffer, backgroundBuffer, Buffer.Length);

        Debug.WriteLine("SubtractBackgroundFromBuffer(ms): " + sw.Elapsed.TotalMilliseconds.ToString("N2"));
    }
4

6 に答える 6

5

これは興味深い質問です。

この最適化は JIT コンパイラーによって既に実行されているため、( TTat およびMaximum Cookieで示唆されているように) 結果が負にならないことをテストした後にのみ減算を実行しても、影響はほとんどありません。

( Selman22で提案されているように) タスクを並列化することは良い考えですが、ループがこの場合と同じくらい高速である場合、オーバーヘッドがゲインを上回ることになるため、私のテストではSelman22 の実装は実際には遅くなります。nick_w さんのベンチマークは、この事実を隠してデバッガを付けて作成されたのではないかと思います。

より大きなチャンクでタスクを並列化すると ( nick_wで提案されているように)、オーバーヘッドの問題に対処し、実際により高速なパフォーマンスを実現できますが、チャンクを自分で計算する必要はありません - を使用Partitionerしてこれを行うことができます。

public static void SubtractBackgroundFromBufferPartitionedParallelForEach(
    ushort[] backgroundBuffer)
{
    Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range =>
        {
            for (int i = range.Item1; i < range.Item2; ++i)
            {
                if (Buffer[i] < backgroundBuffer[i])
                {
                    Buffer[i] = 0;
                }
                else
                {
                    Buffer[i] -= backgroundBuffer[i];
                }
            }
        });
}

上記の方法は、私のテストでは一貫してnick_w の手巻きのチャンクよりも優れています。

ちょっと待って!それだけではありません。

コードを遅くする本当の原因は、代入や演算ではありません。if声明です。パフォーマンスへの影響は、処理するデータの性質によって大きく左右されます。

nick_w のベンチマークでは、両方のバッファに対して同じ大きさのランダム データが生成されます。ただし、実際にはバックグラウンド バッファー内の平均マグニチュード データが小さい可能性が非常に高いと思います。この詳細は、分岐予測のために重要になる可能性があります (この古典的な SO 回答で説明されているように)。

通常、バックグラウンド バッファー内の値がバッファー内の値よりも小さい場合、JIT コンパイラーはこれを認識し、それに応じてその分岐を最適化できます。if各バッファ内のデータが同じランダム母集団からのものである場合、50% を超える精度でステートメントの結果を推測する方法はありません。nick_wがベンチマークを行っているのはこの後者のシナリオであり、これらの条件下では、安全でないコードを使用して bool を整数に変換し、分岐をまったく回避することで、メソッドをさらに最適化できる可能性があります。(次のコードは、bool がメモリ内でどのように表現されるかの実装の詳細に依存していることに注意してください。.NET 4.5 のシナリオでは機能しますが、必ずしも良い考えではなく、説明のためにここに示されています。)

public static void SubtractBackgroundFromBufferPartitionedParallelForEachHack(
    ushort[] backgroundBuffer)
{
    Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range =>
        {
            for (int i = range.Item1; i < range.Item2; ++i)
            {
                unsafe
                {
                    var nonNegative = Buffer[i] > backgroundBuffer[i];
                    Buffer[i] = (ushort)((Buffer[i] - backgroundBuffer[i]) *
                        *((int*)(&nonNegative)));
                }
            }
        });
}

もう少し時間を節約したい場合は、言語を C++/CLI に切り替えることで、より安全な方法でこのアプローチに従うことができます。これにより、安全でないコードに頼ることなく算術式でブール値を使用できるようになります。

UInt16 MyCppLib::Maths::SafeSubtraction(UInt16 minuend, UInt16 subtrahend)
{
    return (UInt16)((minuend - subtrahend) * (minuend > subtrahend));
}

上記の静的メソッドを公開する C++/CLI を使用して純粋に管理された DLL を作成し、それを C# コードで使用できます。

public static void SubtractBackgroundFromBufferPartitionedParallelForEachCpp(
    ushort[] backgroundBuffer)
{
    Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range =>
    {
        for (int i = range.Item1; i < range.Item2; ++i)
        {
            Buffer[i] = 
                MyCppLib.Maths.SafeSubtraction(Buffer[i], backgroundBuffer[i]);
        }
    });
}

これは、上記のハックで安全でない C# コードよりも優れています。実際、並列化を忘れて C++/CLI を使用してメソッド全体を記述できるほど高速であり、それでも他の手法よりも優れたパフォーマンスを発揮します。

nick_w のテスト ハーネスを使用すると、上記の方法は、これまでにここで公開された他の提案よりも優れています。これが私が得た結果です(1〜4は彼が試したケースで、5〜7はこの回答で概説されているものです):

1. SubtractBackgroundFromBuffer(ms):                               2,021.37
2. SubtractBackgroundFromBufferWithCalcOpt(ms):                    2,125.80
3. SubtractBackgroundFromBufferParallelFor(ms):                    3,431.58
4. SubtractBackgroundFromBufferBlockParallelFor(ms):               1,401.36
5. SubtractBackgroundFromBufferPartitionedParallelForEach(ms):     1,197.76
6. SubtractBackgroundFromBufferPartitionedParallelForEachHack(ms):   742.72
7. SubtractBackgroundFromBufferPartitionedParallelForEachCpp(ms):    499.27

ただし、実際にはバックグラウンド値が通常小さいと予想されるシナリオでは、分岐予測が成功すると全体的に結果が改善され、ifステートメントを回避するための「ハック」は実際には遅くなります。

バックグラウンド バッファーの値を範囲(バッファーの c. 10%)に制限したときに、 nick_w のテスト ハーネスを使用して取得した結果を次に示します。0-6500

1. SubtractBackgroundFromBuffer(ms):                                 773.50
2. SubtractBackgroundFromBufferWithCalcOpt(ms):                      915.91
3. SubtractBackgroundFromBufferParallelFor(ms):                    2,458.36
4. SubtractBackgroundFromBufferBlockParallelFor(ms):                 663.76
5. SubtractBackgroundFromBufferPartitionedParallelForEach(ms):       658.05
6. SubtractBackgroundFromBufferPartitionedParallelForEachHack(ms):   762.11
7. SubtractBackgroundFromBufferPartitionedParallelForEachCpp(ms):    494.12

結果 1 ~ 5 はすべて劇的に改善されていることがわかります。これは、より優れた分岐予測の恩恵を受けているためです。結果 6 と 7 は、分岐を回避したため、あまり変化していません。

このデータの変更により、状況が完全に変わりました。このシナリオでは、最速のすべての C# ソリューションでさえ、元のコードよりもわずか 15% 高速です。

結論: 代表的なデータを使用して選択した方法を必ずテストしてください。そうしないと、結果が無意味になります。

于 2014-01-16T08:51:27.540 に答える
5

いくつかのベンチマーク。

  1. SubtractBackgroundFromBuffer:これは質問の元の方法です。
  2. SubtractBackgroundFromBufferWithCalcOpt:これは、TTat の計算速度向上のアイデアを追加したオリジナルの方法です。
  3. SubtractBackgroundFromBufferParallelFor:Selman22の回答からの解決策。
  4. SubtractBackgroundFromBufferBlockParallelFor:私の答え。3. と同様ですが、処理を 4096 値のブロックに分割します。
  5. SubtractBackgroundFromBufferPartitionedParallelForEach:ジェフの最初の答え。
  6. SubtractBackgroundFromBufferPartitionedParallelForEachHack:ジェフの2番目の答え。

アップデート

興味深いことに、SubtractBackgroundFromBufferBlockParallelFor(Bruno Costa が提案したように)

Buffer[i] = (ushort)Math.Max(difference, 0);

それ以外の

if (difference >= 0)
    Buffer[i] = (ushort)difference;
else
    Buffer[i] = 0;

結果

これは、各実行での 1000 回の反復の合計時間であることに注意してください。

SubtractBackgroundFromBuffer(ms):                                 2,062.23 
SubtractBackgroundFromBufferWithCalcOpt(ms):                      2,245.42
SubtractBackgroundFromBufferParallelFor(ms):                      4,021.58
SubtractBackgroundFromBufferBlockParallelFor(ms):                   769.74
SubtractBackgroundFromBufferPartitionedParallelForEach(ms):         827.48
SubtractBackgroundFromBufferPartitionedParallelForEachHack(ms):     539.60

したがって、これらの結果から、最良のアプローチは、計算の最適化を組み合わせて小さなゲインを得て、を利用しParallel.Forて画像のチャンクを操作するように思われます。もちろん、走行距離はさまざまであり、並列コードのパフォーマンスは、実行している CPU に左右されます。

テストハーネス

リリース モードで各メソッドに対してこれを実行しました。Stopwatch処理時間のみが測定されるように、この方法で開始および停止しています。

System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
ushort[] bgImg = GenerateRandomBuffer(327680, 818687447);

for (int i = 0; i < 1000; i++)
{
    Buffer = GenerateRandomBuffer(327680, 128011992);                

    sw.Start();
    SubtractBackgroundFromBuffer(bgImg);
    sw.Stop();
}

Console.WriteLine("SubtractBackgroundFromBuffer(ms): " + sw.Elapsed.TotalMilliseconds.ToString("N2"));


public static ushort[] GenerateRandomBuffer(int size, int randomSeed)
{
    ushort[] buffer = new ushort[327680];
    Random random = new Random(randomSeed);

    for (int i = 0; i < size; i++)
    {
        buffer[i] = (ushort)random.Next(ushort.MinValue, ushort.MaxValue);
    }

    return buffer;
}

メソッド

public static void SubtractBackgroundFromBuffer(ushort[] backgroundBuffer)
{
    int bufferLength = Buffer.Length;

    for (int index = 0; index < bufferLength; index++)
    {
        int difference = Buffer[index] - backgroundBuffer[index];

        if (difference >= 0)
            Buffer[index] = (ushort)difference;
        else
            Buffer[index] = 0;
    }
}

public static void SubtractBackgroundFromBufferWithCalcOpt(ushort[] backgroundBuffer)
{
    int bufferLength = Buffer.Length;

    for (int index = 0; index < bufferLength; index++)
    {
        if (Buffer[index] < backgroundBuffer[index])
        {
            Buffer[index] = 0;
        }
        else
        {
            Buffer[index] -= backgroundBuffer[index];
        }
    }
}

public static void SubtractBackgroundFromBufferParallelFor(ushort[] backgroundBuffer)
{
    Parallel.For(0, Buffer.Length, (i) =>
    {
        int difference = Buffer[i] - backgroundBuffer[i];
        if (difference >= 0)
            Buffer[i] = (ushort)difference;
        else
            Buffer[i] = 0;
    });
}        

public static void SubtractBackgroundFromBufferBlockParallelFor(ushort[] backgroundBuffer)
{
    int blockSize = 4096;

    Parallel.For(0, (int)Math.Ceiling(Buffer.Length / (double)blockSize), (j) =>
    {
        for (int i = j * blockSize; i < (j + 1) * blockSize; i++)
        {
            int difference = Buffer[i] - backgroundBuffer[i];

            Buffer[i] = (ushort)Math.Max(difference, 0);                    
        }
    });
}

public static void SubtractBackgroundFromBufferPartitionedParallelForEach(ushort[] backgroundBuffer)
{
    Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range =>
        {
            for (int i = range.Item1; i < range.Item2; ++i)
            {
                if (Buffer[i] < backgroundBuffer[i])
                {
                    Buffer[i] = 0;
                }
                else
                {
                    Buffer[i] -= backgroundBuffer[i];
                }
            }
        });
}

public static void SubtractBackgroundFromBufferPartitionedParallelForEachHack(ushort[] backgroundBuffer)
{
    Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range =>
    {
        for (int i = range.Item1; i < range.Item2; ++i)
        {
            unsafe
            {
                var nonNegative = Buffer[i] > backgroundBuffer[i];
                Buffer[i] = (ushort)((Buffer[i] - backgroundBuffer[i]) *
                    *((int*)(&nonNegative)));
            }
        }
    });
}
于 2014-01-16T01:53:41.567 に答える
1

実際に減算を実行する前に、結果が負になるかどうかを最初に確認することで、パフォーマンスがわずかに向上する場合があります。そうすれば、結果が負になる場合に減算を実行する必要はありません。例:

if (Buffer[index] > backgroundBuffer[index])
    Buffer[index] = (ushort)(Buffer[index] - backgroundBuffer[index]);
else
    Buffer[index] = 0;
于 2014-01-16T01:14:43.857 に答える
0

どうですか、

Enumerable.Range(0, Buffer.Length).AsParalell().ForAll(i =>
    {
         unsafe
        {
            var nonNegative = Buffer[i] > backgroundBuffer[i];
            Buffer[i] = (ushort)((Buffer[i] - backgroundBuffer[i]) *
                *((int*)(&nonNegative)));
        }
    });
于 2014-01-16T10:58:20.693 に答える