5

次のコードを最適化して実行速度を上げることができるかどうか疑問に思います。私は現在、非常に単純なデータフロー構造で1秒あたり約140万の単純なメッセージで最大になっているようです。このサンプルプロセスはメッセージを同期的に渡す/変換することを認識していますが、現在、タスクと同時収集に基づく独自のカスタムソリューションの可能な代替として、TPLデータフローをテストしています。「同時」という用語は、物事を並行して実行することをすでに示唆していますが、現在のテスト目的では、同期を介して独自のソリューションにメッセージをプッシュし、1秒あたり約510万のメッセージに到達します。ここで欠けているのは、TPL Dataflowが高スループット、低遅延のソリューションとしてプッシュされたことを読みましたが、これまでのところ、パフォーマンスの微調整を見落としているに違いありません。私を正しい方向に向けることができる人はいますか?

class TPLDataFlowExperiments
{
    public TPLDataFlowExperiments()
    {
        var buf1 = new BufferBlock<int>();

        var transform = new TransformBlock<int, string>(t =>
            {
                return "";
            });

        var action = new ActionBlock<string>(s =>
            {
                //Thread.Sleep(100);
                //Console.WriteLine(s);
            });

        buf1.LinkTo(transform);
        transform.LinkTo(action);

        //Propagate all Completions down the flow
        buf1.Completion.ContinueWith(t =>
        {
            transform.Complete();
            transform.Completion.ContinueWith(u =>
            {
                action.Complete();
            });
        });

        Stopwatch watch = new Stopwatch();
        watch.Start();

        int cap = 10000000;
        for (int i = 0; i < cap; i++)
        {
            buf1.Post(i);
        }

        //Mark Buffer as Complete
        buf1.Complete();

        action.Completion.ContinueWith(t =>
            {
                watch.Stop();

                Console.WriteLine("All Blocks finished processing");
                Console.WriteLine("Units processed per second: " + cap / watch.ElapsedMilliseconds * 1000);
            });

        Console.ReadLine();
    }
}
4

4 に答える 4

9

これは主に1つのことに帰着すると思います:あなたのテストはほとんど無意味です。これらのブロックはすべて何かを実行することになっており、複数のコアと非同期操作を使用してそれを実行します。

また、テストでは、同期に多くの時間が費やされている可能性があります。より現実的なコードを使用すると、コードの実行に時間がかかるため、競合が少なくなり、実際のオーバーヘッドは測定したものよりも小さくなります。

しかし、実際にあなたの質問に答えるために、はい、あなたはいくつかのパフォーマンスの微調整を見落としています。具体的にはSingleProducerConstrained、これは、ロックの少ないデータ構造を使用できることを意味します。これを両方のブロックで使用すると(BufferBlockここでは完全に役に立たないので、安全に削除できます)、速度は1秒あたり約300万から400万のアイテムから、私のコンピューターでは500万を超えるアイテムに上昇します。

于 2012-06-22T13:07:52.220 に答える
2

svickの答えに追加するために、テストは単一のアクションブロックに対して単一の処理スレッドのみを使用します。このようにして、ブロックを使用するオーバーヘッド以外は何もテストしません。

DataFlowは、F#エージェント、Scalaアクター、およびMPI実装と同様に機能します。各アクションブロックは、一度に1つのタスクを実行し、入力をリッスンして出力を生成します。スピードアップは、複数のコアで独立して実行できるステップでアルゴリズムを分割し、メッセージのみを相互に渡すことによって提供されます。

並行タスクの数を増やすことはできますが、最も重要な問題は、他のタスクとは独立して最大量のステップを実行するフローを設計することです。

于 2012-06-22T14:21:52.123 に答える
0

データフローブロックの並列度を上げることもできます。これにより、速度がさらに向上する可能性があり、ブロックの1つが残りのブロックのボトルネックとして機能していることがわかった場合は、線形タスク間の負荷分散にも役立ちます。

于 2013-12-03T21:17:25.660 に答える
0

ワークロードが非常に細かく、1秒あたり数百万のメッセージを処理すると予想される場合、関連するオーバーヘッドのために、パイプラインを介して個々のメッセージを渡すことは実行不可能になります。メッセージを配列またはリストにバッチ処理して、ワークロードをチャンク化する必要があります。例えば:

var transform = new TransformBlock<int[], string[]>(batch =>
{
    var results = new string[batch.Length];
    for (int i = 0; i < batch.Length; i++)
    {
        results[i] = ProcessItem(batch[i]);
    }
    return results;
});

入力をバッチ処理するには、 System.InteractiveBatchBlockパッケージの「linqy」Buffer拡張メソッド、またはパッケージの同様の機能メソッドを使用するか、手動で実行します。BatchMoreLinq

于 2020-06-11T11:42:03.357 に答える