編集: svick の推奨に従って、カスタム IPropagatorBlock を単純な TransformBlock に置き換えましたが、入力項目の順序と出力項目の順序が一致していません。渡す TransformBlock インスタンス化と Func の下:
quoteBuffer = new TransformBlock<Tuple<Symbol, int>, List<Quote>>(syncExecution, new ExecutionDataflowBlockOptions { SingleProducerConstrained = true, MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
//Function that performs Sync Processing
Func<Tuple<Symbol, int>, List<Quote>> syncExecution = new Func<Tuple<Symbol, int>, List<Quote>>(partitionTuple =>
{
Symbol symbol = partitionTuple.Item1;
int partitionIndex = partitionTuple.Item2;
//Read Binary Data
byte[] byteArray = binaryDataReaders[symbol].ReadBytes(partitionIndex);
//Deserialize and return quote list
List<Quote> quoteList = dataInterfaces[symbol].Deserialize(symbol, byteArray);
return quoteList;
});
そして、これは私が変換ブロックに投稿する方法です:
quoteBuffer.SendAsync(new Tuple<Symbol, int>(symbol, counter));
元の質問:
誰かが次のカスタム変換ブロックを手伝ってくれました。アイデアは、TInput をポスト/センダシンクし、TInput を非同期的に処理する一方で、カスタム変換ブロックが変換されたアイテムを返すときにポストされたアイテムの順序を保持することです。
たとえば、それぞれの順序で 1、2、3 をポストし、変換関数が各入力を 2 乗してアイテムを返す場合、正しい出力値と順序は 1、4、9 である必要があります。 .
ただし、出力順序が正しくないため、コードにエラーがあると思われます。さらに悪いことに、混乱した順序の場所はランダムであるため、デバッグが難しくなりますが、これは明らかに、入力要素を出力要素に変換するために開始されるタスクが常に異なる方法で完了するという事実を反映しています。
誰かが見て、私がここで見逃しているヒントを教えてもらえますか? ありがとう
public static IPropagatorBlock<TInput, TOutput> CreateConcurrentOrderedTransformBlock<TInput, TOutput>(Func<TInput, TOutput> transform)
{
var queue = new TransformBlock<Task<TOutput>, TOutput>(t => t);
var processor = new ActionBlock<Tuple<TInput, Action<TOutput>>>(
tuple => tuple.Item2(transform(tuple.Item1)),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
var enqueuer = new ActionBlock<TInput>(
async item =>
{
var tcs = new TaskCompletionSource<TOutput>();
await processor.SendAsync(
new Tuple<TInput, Action<TOutput>>(item, tcs.SetResult));
await queue.SendAsync(tcs.Task);
});
enqueuer.Completion.ContinueWith(
_ =>
{
queue.Complete();
processor.Complete();
});
return DataflowBlock.Encapsulate(enqueuer, queue);
}