12

私はさまざまなタスクでかなり典型的な生産者/消費者モデルを実行しています。

タスク1:バイナリファイルからbyte []のバッチを読み取り、バイト配列のコレクションごとに新しいタスクを開始します。(操作はメモリ管理の目的でバッチ処理されます)。

タスク2-n:これらはワーカータスクであり、それぞれがバイト配列の渡されたコレクション(Tasks1から)を操作し、バイト配列を逆シリアル化し、特定の基準で並べ替えてから、結果のオブジェクトのコレクション(各バイト配列)を格納します。コンカレントディクショナリでそのようなオブジェクトに逆シリアル化します)。

タスク(n + 1)このタスクの仕事は、タスク1から発生した方法と同じ順序で並行ディクショナリに格納されているコレクションをマージすることであるため、並行ディクショナリを選択しました。これを実現するには、collectionID(int型であり、Task1内の新しいコレクションごとにインクリメントされます)をTask1からこのタスクまでずっと渡します。このタスクは基本的に、次に予想されるcollectionIDが並行ディクショナリにすでに格納されているかどうかをチェックし、格納されている場合はそれを取り出してFinal Queueに追加し、並行ディクショナリ内の次のコレクションをチェックします。

さて、私が読んだことと私が見たビデオから、TPLデータフローはそのような生産者/消費者モデルの完璧な候補であるように思われます。TPL Dataflowを使ったことがないので、デザインを考案できず、始められないようです。スループットとレイテンシーの観点から、このライブラリーはタスクに対応していますか?私は現在、250万バイトの配列を処理しているため、結果のコレクションで1秒あたりのオブジェクトを処理しています。TPLデータフローは簡素化に役立ちますか?私は特に次の質問への回答に興味があります。TPL Dataflowは、ワーカータスクを生成し、ワーカータスクが作業を完了した後でそれらを再マージするときに、Task1からの収集バッチの順序を保持できますか?それは物事を最適化しますか?構造全体のプロファイルを作成した後、回転と同時収集が多すぎるためにかなりの時間が無駄になっていると感じています。

何かアイデア、考えはありますか?

4

2 に答える 2

11

編集:私は非常に間違っていたことが判明しました。並列処理用に構成されている場合でも、アイテムを受け取ったのと同じ順序で返しますTransformBlock そのため、私の元の回答のコードは完全に役に立たず、TransformBlock代わりに通常を使用できます。


元の答え:

私の知る限り、.Netの並列処理コンストラクトは、処理されたアイテムを受け取った順序で返すことをサポートしています。PLINQwith AsOrdered()。しかし、PLINQはあなたが望むものにうまく適合していないように私には思えます。

一方、TPL Dataflowはうまく適合していると思いますが、並列処理とアイテムの返送を同時にサポートするブロックはありません(TransformBlock両方をサポートしますが、同時にはサポートしません)。幸い、Dataflowブロックは構成可能性を念頭に置いて設計されているため、それを実行する独自のブロックを構築できます。

ただし、最初に、結果の順序付け方法を理解する必要があります。あなたが提案したように、いくつかの同期メカニズムとともに並行辞書を使用することは確かに機能します。しかし、もっと簡単な解決策があると思います。sのキューを使用しますTask。出力タスクでは、をデキューしTask、(非同期で)完了するのを待ち、完了すると、その結果を送信します。キューが空の場合でも同期が必要ですが、使用するキューを巧みに選択すれば、無料で同期を取得できます。

したがって、一般的な考え方は次のようになります。私たちが書いているのはIPropagatorBlock、いくつかの入力といくつかの出力を持つです。カスタムを作成する最も簡単な方法IPropagatorBlockは、入力を処理する1つのブロックと、結果を生成する別のブロックを作成し、を使用してそれらを1つとして扱うことDataflowBlock.Encapsulate()です。

入力ブロックは、着信アイテムを正しい順序で処理する必要があるため、並列化は行われません。新しいTask(実際にTaskCompletionSourceは、後で結果を設定できるようTaskに)を作成し、それをキューに追加してから、正しい結果を設定するための何らかの方法とともに、処理のためにアイテムを送信しますTask。このブロックを何かにリンクする必要がないため、を使用できますActionBlock

出力ブロックはTask、キューからsを取得し、非同期でそれらを待機してから、それらを送信する必要があります。ただし、すべてのブロックにはキューが埋め込まれており、デリゲートを受け取るブロックには非同期待機が組み込まれているため、これは非常に簡単ですnew TransformBlock<Task<TOutput>, TOutput>(t => t)。このブロックは、キューと出力ブロックの両方として機能します。このため、同期を処理する必要はありません。

パズルの最後のピースは、実際にはアイテムを並行して処理しています。ActionBlockこのために、今度はMaxDegreeOfParallelismセットで別のを使用できます。入力を受け取り、それを処理し、正しい結果をTaskキューに設定します。

まとめると、次のようになります。

public static IPropagatorBlock<TInput, TOutput>
    CreateConcurrentOrderedTransformBlock<TInput, TOutput>(
    Func<TInput, TOutput> transform)
{
    var queue = new TransformBlock<Task<TOutput>, TOutput>(t => t);

    var processor = new ActionBlock<Tuple<TInput, Action<TOutput>>>(
        tuple => tuple.Item2(transform(tuple.Item1)),
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
        });

    var enqueuer = new ActionBlock<TInput>(
        async item =>
        {
            var tcs = new TaskCompletionSource<TOutput>();
            await processor.SendAsync(
                new Tuple<TInput, Action<TOutput>>(item, tcs.SetResult));
            await queue.SendAsync(tcs.Task);
        });

    enqueuer.Completion.ContinueWith(
        _ =>
        {
            queue.Complete();
            processor.Complete();
        });

    return DataflowBlock.Encapsulate(enqueuer, queue);
}

たくさん話した後、それはかなり少量のコードだと思います。

パフォーマンスを重視しているようですので、このコードを微調整する必要があるかもしれません。たとえば、オーバーサブスクリプションを回避するためMaxDegreeOfParallelismに、processorブロックをのようなものに設定することが理にかなっている場合があります。Environment.ProcessorCountまた、スループットよりもレイテンシーの方が重要な場合MaxMessagesPerTaskは、同じブロックを1(または別の小さな数)に設定して、アイテムの処理が終了するとすぐに出力に送信されるようにするのが理にかなっている場合があります。

また、着信アイテムを抑制したい場合は、のセットを使用できBoundedCapacityますenqueuer

于 2012-06-15T17:46:02.320 に答える
0

はい、TPLデータフローライブラリはこのジョブに最適です。必要なすべての機能をサポートします:MaxDegreeOfParallelism、、。ただし、このオプションを使用するには、細部に注意を払う必要があります。BoundedCapacityEnsureOrderedBoundedCapacity

最初に、パイプラインの最初のブロックにメソッドをフィードすることを確認する必要がありますSendAsync。そうしないと、Postメソッドを使用してその戻り値を無視すると、メッセージが失われる可能性があります。ブロックのSendAsync内部バッファに着信メッセージ用の空き領域ができるまで呼び出し元を非同期にブロックするため、はメッセージを失うことはありません。

次に、ダウンストリームのブロックで発生する可能性のある例外がフィーダーを無期限にブロックしないようにし、決して来ない空き領域を待つ必要があります。ブロックを構成することによってこれを自動的に実行する組み込みの方法はありません。代わりに、ダウンストリームブロックの完了をアップストリームブロックに手動で伝播する必要があります。PropagateFailureこれは、以下の例のメソッドの意図です。

public static async Task ProcessAsync(string[] filePaths,
    ConcurrentQueue<MyClass> finalQueue)
{
    var reader = new TransformBlock<string, byte[]>(filePath =>
    {
        byte[] result = ReadBinaryFile(filePath);
        return result;
    }, new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 1, // This is the default
        BoundedCapacity = 20, // keep memory usage under control
        EnsureOrdered = true // This is also the default
    });

    var deserializer = new TransformBlock<byte[], MyClass>(bytes =>
    {
        MyClass result = Deserialize(bytes);
        return result;
    }, new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = Environment.ProcessorCount,
        BoundedCapacity = 20
    });

    var writer = new ActionBlock<MyClass>(obj =>
    {
        finalQueue.Enqueue(obj);
    });

    reader.LinkTo(deserializer,
        new DataflowLinkOptions() { PropagateCompletion = true });
    PropagateFailure(deserializer, reader); // Link backwards

    deserializer.LinkTo(writer,
        new DataflowLinkOptions() { PropagateCompletion = true });
    PropagateFailure(writer, deserializer); // Link backwards

    foreach (var filePath in filePaths)
    {
        var accepted = await reader.SendAsync(filePath).ConfigureAwait(false);
        if (!accepted) break; // This will happen in case that the block has failed
    }
    reader.Complete(); // This will be ignored if the block has already failed

    await writer.Completion; // This will propagate the first exception that occurred
}

public static async void PropagateFailure(IDataflowBlock block1,
    IDataflowBlock block2)
{
    try { await block1.Completion.ConfigureAwait(false); }
    catch (Exception ex) { block2.Fault(ex); }
}
于 2020-06-07T16:32:15.610 に答える