編集:私は非常に間違っていたことが判明しました。並列処理用に構成されている場合でも、アイテムを受け取ったのと同じ順序で返しますTransformBlock
。そのため、私の元の回答のコードは完全に役に立たず、TransformBlock
代わりに通常を使用できます。
元の答え:
私の知る限り、.Netの並列処理コンストラクトは、処理されたアイテムを受け取った順序で返すことをサポートしています。PLINQwith AsOrdered()
。しかし、PLINQはあなたが望むものにうまく適合していないように私には思えます。
一方、TPL Dataflowはうまく適合していると思いますが、並列処理とアイテムの返送を同時にサポートするブロックはありません(TransformBlock
両方をサポートしますが、同時にはサポートしません)。幸い、Dataflowブロックは構成可能性を念頭に置いて設計されているため、それを実行する独自のブロックを構築できます。
ただし、最初に、結果の順序付け方法を理解する必要があります。あなたが提案したように、いくつかの同期メカニズムとともに並行辞書を使用することは確かに機能します。しかし、もっと簡単な解決策があると思います。sのキューを使用しますTask
。出力タスクでは、をデキューしTask
、(非同期で)完了するのを待ち、完了すると、その結果を送信します。キューが空の場合でも同期が必要ですが、使用するキューを巧みに選択すれば、無料で同期を取得できます。
したがって、一般的な考え方は次のようになります。私たちが書いているのはIPropagatorBlock
、いくつかの入力といくつかの出力を持つです。カスタムを作成する最も簡単な方法IPropagatorBlock
は、入力を処理する1つのブロックと、結果を生成する別のブロックを作成し、を使用してそれらを1つとして扱うことDataflowBlock.Encapsulate()
です。
入力ブロックは、着信アイテムを正しい順序で処理する必要があるため、並列化は行われません。新しいTask
(実際にTaskCompletionSource
は、後で結果を設定できるようTask
に)を作成し、それをキューに追加してから、正しい結果を設定するための何らかの方法とともに、処理のためにアイテムを送信しますTask
。このブロックを何かにリンクする必要がないため、を使用できますActionBlock
。
出力ブロックはTask
、キューからsを取得し、非同期でそれらを待機してから、それらを送信する必要があります。ただし、すべてのブロックにはキューが埋め込まれており、デリゲートを受け取るブロックには非同期待機が組み込まれているため、これは非常に簡単ですnew TransformBlock<Task<TOutput>, TOutput>(t => t)
。このブロックは、キューと出力ブロックの両方として機能します。このため、同期を処理する必要はありません。
パズルの最後のピースは、実際にはアイテムを並行して処理しています。ActionBlock
このために、今度はMaxDegreeOfParallelism
セットで別のを使用できます。入力を受け取り、それを処理し、正しい結果をTask
キューに設定します。
まとめると、次のようになります。
public static IPropagatorBlock<TInput, TOutput>
CreateConcurrentOrderedTransformBlock<TInput, TOutput>(
Func<TInput, TOutput> transform)
{
var queue = new TransformBlock<Task<TOutput>, TOutput>(t => t);
var processor = new ActionBlock<Tuple<TInput, Action<TOutput>>>(
tuple => tuple.Item2(transform(tuple.Item1)),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
var enqueuer = new ActionBlock<TInput>(
async item =>
{
var tcs = new TaskCompletionSource<TOutput>();
await processor.SendAsync(
new Tuple<TInput, Action<TOutput>>(item, tcs.SetResult));
await queue.SendAsync(tcs.Task);
});
enqueuer.Completion.ContinueWith(
_ =>
{
queue.Complete();
processor.Complete();
});
return DataflowBlock.Encapsulate(enqueuer, queue);
}
たくさん話した後、それはかなり少量のコードだと思います。
パフォーマンスを重視しているようですので、このコードを微調整する必要があるかもしれません。たとえば、オーバーサブスクリプションを回避するためMaxDegreeOfParallelism
に、processor
ブロックをのようなものに設定することが理にかなっている場合があります。Environment.ProcessorCount
また、スループットよりもレイテンシーの方が重要な場合MaxMessagesPerTask
は、同じブロックを1(または別の小さな数)に設定して、アイテムの処理が終了するとすぐに出力に送信されるようにするのが理にかなっている場合があります。
また、着信アイテムを抑制したい場合は、のセットを使用できBoundedCapacity
ますenqueuer
。