5

n-TransformBlocksによってリンクできるJoinBlockの代替手段を探し、そのコレクションを別のデータフローブロックに渡すために、すべてのTransformBlockソースブロックのメッセージを結合/マージします。

JoinBlockは正常に機能しますが、最大3つのソースブロックをフックするように制限されています。また、かなりの数の非効率性に悩まされています(2つのソースブロックの値型(int)でさえ結合するのが非常に遅い)。TransformBlocksからタスクを返し、すべてのTransformBlocksがタスクを完了するまで待ってから、受け入れる方法はありTask<item>ますか?

別のアイデアはありますか?結合されたアイテムコレクションを渡す前に結合する必要のあるアイテムを1〜20個持つ可能性があります。各変換ブロックは、「変換された」入力項目ごとに1つの出力項目を返すことが保証されています。

編集:要求された説明:

以前の質問の1つに従って、JoinBlocksを次のように設定しました。

public Test()
{
    broadCastBlock = new BroadcastBlock<int>(i =>
        {
            return i;
        });

    transformBlock1 = new TransformBlock<int, int>(i =>
        {
            return i;
        });

    transformBlock2 = new TransformBlock<int, int>(i =>
        {
            return i;
        });

    joinBlock = new JoinBlock<int, int>();

    processorBlock = new ActionBlock<Tuple<int, int>>(tuple =>
        {
            //Console.WriteLine("tfb1: " + tuple.Item1 + "tfb2: " + tuple.Item2);
        });

    //Linking
    broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
    broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
    transformBlock1.LinkTo(joinBlock.Target1);
    transformBlock2.LinkTo(joinBlock.Target2);
    joinBlock.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
}

public void Start()
{
    Stopwatch watch = new Stopwatch();
    watch.Start();

    const int numElements = 1000000;

    for (int i = 1; i <= numElements; i++)
    {
        broadCastBlock.Post(i);
    }

    ////mark completion
    broadCastBlock.Complete();
    Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion).ContinueWith(_ => joinBlock.Complete());


    processorBlock.Completion.Wait();

    watch.Stop();

    Console.WriteLine("Time it took: " + watch.ElapsedMilliseconds + " - items processed per second: " + numElements / watch.ElapsedMilliseconds * 1000);
    Console.ReadLine();
}
4

2 に答える 2

6

これを行う1つの方法は、をに設定して使用BatchBlockすることです。この構成では、ブロックは、消費されるのを待っているさまざまなブロックからのアイテムが存在するまで何もしません(ここで、は作成時に設定した数です)。その場合、すべてのアイテムを一度に消費し、すべてのアイテムを含む配列を生成します。GreedyfalsennnBatchBlockn

このソリューションの注意点の1つは、結果の配列が並べ替えられないことです。つまり、どのアイテムがどのソースからのものであるかがわかりません。そして、そのパフォーマンスがどのように比較されるのかわかりませんJoinBlock。自分でテストする必要があります。BatchBlock(ただし、欲張りでない消費に必要なオーバーヘッドのために、この方法を使用する方が遅いかどうかはわかります。)

于 2012-12-02T10:16:16.347 に答える
0

アイテムごとに複数の並列操作を実行する場合は、IMHOがこれらの操作を複数のブロックに分割してから、独立した結果を1つのオブジェクトに再度結合するのではなく、1つのブロック内で実行する方が理にかなっています。だから私の提案はこのようなことをすることです:

var block = new TransformBlock<MyClass, MyClass>(async item =>
{
    Task<SomeType1> task1 = Task.Run(() => CalculateProperty1(item.Id));
    Task<SomeType2> task2 = Task.Run(() => CalculateProperty2(item.Id));
    await Task.WhenAll(task1, task2).ConfigureAwait(false);
    item.Property1 = task1.Result;
    item.Property2 = task2.Result;
    return item;
}, new ExecutionDataflowBlockOptions()
{
    MaxDegreeOfParallelism = 2
});

上記の例では、タイプのアイテムは。MyClassを介して渡されますTransformBlock。プロパティProperty1Property2各アイテムのプロパティは、プロパティごとに個別に使用して並行して計算されTaskます。次に、両方のタスクが待機され、両方が完了すると、結果がアイテムのプロパティに割り当てられます。最後に、処理されたアイテムが返されます。

このアプローチで注意したいのは、並列処理の程度が内部並列操作とMaxDegreeOfParallelismブロックのオプションの積になるということだけです。したがって、上記の例では、並列度は2 x 2 = 4になります。正確には、2つの内部計算の一方が他方よりも遅くなる可能性があるため、これが最大の並列度になります。したがって、任意の時点で、実際の並列度は2から4の間のいずれかになります。

于 2020-06-11T10:59:11.887 に答える