n-TransformBlocksによってリンクできるJoinBlockの代替手段を探し、そのコレクションを別のデータフローブロックに渡すために、すべてのTransformBlockソースブロックのメッセージを結合/マージします。
JoinBlockは正常に機能しますが、最大3つのソースブロックをフックするように制限されています。また、かなりの数の非効率性に悩まされています(2つのソースブロックの値型(int)でさえ結合するのが非常に遅い)。TransformBlocksからタスクを返し、すべてのTransformBlocksがタスクを完了するまで待ってから、受け入れる方法はありTask<item>
ますか?
別のアイデアはありますか?結合されたアイテムコレクションを渡す前に結合する必要のあるアイテムを1〜20個持つ可能性があります。各変換ブロックは、「変換された」入力項目ごとに1つの出力項目を返すことが保証されています。
編集:要求された説明:
以前の質問の1つに従って、JoinBlocksを次のように設定しました。
public Test()
{
broadCastBlock = new BroadcastBlock<int>(i =>
{
return i;
});
transformBlock1 = new TransformBlock<int, int>(i =>
{
return i;
});
transformBlock2 = new TransformBlock<int, int>(i =>
{
return i;
});
joinBlock = new JoinBlock<int, int>();
processorBlock = new ActionBlock<Tuple<int, int>>(tuple =>
{
//Console.WriteLine("tfb1: " + tuple.Item1 + "tfb2: " + tuple.Item2);
});
//Linking
broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock1.LinkTo(joinBlock.Target1);
transformBlock2.LinkTo(joinBlock.Target2);
joinBlock.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
}
public void Start()
{
Stopwatch watch = new Stopwatch();
watch.Start();
const int numElements = 1000000;
for (int i = 1; i <= numElements; i++)
{
broadCastBlock.Post(i);
}
////mark completion
broadCastBlock.Complete();
Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion).ContinueWith(_ => joinBlock.Complete());
processorBlock.Completion.Wait();
watch.Stop();
Console.WriteLine("Time it took: " + watch.ElapsedMilliseconds + " - items processed per second: " + numElements / watch.ElapsedMilliseconds * 1000);
Console.ReadLine();
}