次の問題に直面しています。
オブジェクトのデータ ストリームがありFoo
、それらのオブジェクトをいくつかの同時インプロセス タスク/スレッドにストリーミングします。これらのタスク/スレッドは、オブジェクトを処理してオブジェクトを出力しFooResult
ます。それぞれのメンバーには、 の作成に使用されたものFooResult
と同じものが含まれています。ただし、必ずしもすべてが.Foo
FooResult
Foo
FooResult
私の問題は、このプロセス全体から、並行タスク内から作成されたFoo
可能性のある元のオブジェクトと、もしあればすべてのオブジェクトを含むラッピングオブジェクトを渡したいということです。FooResult
Foo
注: 現在、TPL データフローを使用していますが、各同時プロセスActionBlock<Foo>
は からリンクされている内で発生しますBroadCastBlock<Foo>
。SendAsync()
ターゲット データフロー ブロックを使用して、潜在的に作成された を送信しますFooResult
。明らかに、同時データ フロー ブロックFooResult
は予測できない時間に生成されます。これは、私が現在苦労していることです。FooResult
まとめて何個作成されたのか、ActionBlock<Foo>
オリジナルと一緒にまとめてFoo
ラッピングオブジェクトとして渡すことができるようには思えません。
擬似コードでは、現在次のようになっています。
BroadCastBlock<Foo> broadCastBlock;
ActionBlock<Foo> aBlock1;
ActionBlock<Foo> aBlock2;
ActionBlock<FooResult> targetBlock;
broadCastBlock.LinkTo(aBlock1); broadCastBlock.LinkTo(aBlock2);
aBlock1 = new ActionBlock<Foo>(foo =>
{
//do something here. Sometimes create a FooResult. If then
targetBlock.SendAsync(fooResult);
});
//similar for aBlock2
ただし、現在のコードの問題は、アクション ブロックのいずれかでFoo
a がシングルを生成しなかった場合、targetBlock が何も受信しない可能性があることです。FooResult
また、FooResult
各アクション ブロックがFooResult
.
私が望むのは、targetBlock がそれぞれを含むラッピング オブジェクトを受け取り、オブジェクトが作成された場合は のコレクションもFoo
受け取ることです。FooResult
FooResult
説明されている方法でソリューションを機能させるために私ができることはありますか? TPL Dataflow を熟読する必要はありませんが、熟読できれば便利です。
更新: 以下は、svick によって提案された JoinBlock の実装を通じて取得したものです。私はそれを使用するつもりはありません (パフォーマンスの観点から微調整できない限り)。実行が非常に遅いため、1 秒あたり約 89000 個のアイテム (およびそれは int 値型のみ) に達します。
public class Test
{
private BroadcastBlock<int> broadCastBlock;
private TransformBlock<int, int> transformBlock1;
private TransformBlock<int, int> transformBlock2;
private JoinBlock<int, int, int> joinBlock;
private ActionBlock<Tuple<int, int, int>> processorBlock;
public Test()
{
broadCastBlock = new BroadcastBlock<int>(i =>
{
return i;
});
transformBlock1 = new TransformBlock<int, int>(i =>
{
return i;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
transformBlock2 = new TransformBlock<int, int>(i =>
{
return i;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
joinBlock = new JoinBlock<int, int, int>();
processorBlock = new ActionBlock<Tuple<int, int, int>>(tuple =>
{
//Console.WriteLine("original value: " + tuple.Item1 + "tfb1: " + tuple.Item2 + "tfb2: " + tuple.Item3);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
//Linking
broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(joinBlock.Target1, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock1.LinkTo(joinBlock.Target2, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock2.LinkTo(joinBlock.Target3, new DataflowLinkOptions { PropagateCompletion = true });
joinBlock.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
}
public void Start()
{
Stopwatch watch = new Stopwatch();
watch.Start();
const int numElements = 1000000;
for (int i = 1; i <= numElements; i++)
{
broadCastBlock.Post(i);
}
////mark completion
broadCastBlock.Complete();
processorBlock.Completion.Wait();
watch.Stop();
Console.WriteLine("Time it took: " + watch.ElapsedMilliseconds + " - items processed per second: " + numElements / watch.ElapsedMilliseconds * 1000);
Console.ReadLine();
}
}
提案を反映するためのコードの更新:
public Test()
{
broadCastBlock = new BroadcastBlock<int>(i =>
{
return i;
});
transformBlock1 = new TransformBlock<int, int>(i =>
{
return i;
});
transformBlock2 = new TransformBlock<int, int>(i =>
{
return i;
});
joinBlock = new JoinBlock<int, int>();
processorBlock = new ActionBlock<Tuple<int, int>>(tuple =>
{
//Console.WriteLine("tfb1: " + tuple.Item1 + "tfb2: " + tuple.Item2);
});
//Linking
broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock1.LinkTo(joinBlock.Target1);
transformBlock2.LinkTo(joinBlock.Target2);
joinBlock.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
}
public void Start()
{
Stopwatch watch = new Stopwatch();
watch.Start();
const int numElements = 1000000;
for (int i = 1; i <= numElements; i++)
{
broadCastBlock.Post(i);
}
////mark completion
broadCastBlock.Complete();
Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion).ContinueWith(_ => joinBlock.Complete());
processorBlock.Completion.Wait();
watch.Stop();
Console.WriteLine("Time it took: " + watch.ElapsedMilliseconds + " - items processed per second: " + numElements / watch.ElapsedMilliseconds * 1000);
Console.ReadLine();
}
}