1

編集: svick の推奨に従って、カスタム IPropagatorBlock を単純な TransformBlock に置き換えましたが、入力項目の順序と出力項目の順序が一致していません。渡す TransformBlock インスタンス化と Func の下:

quoteBuffer = new TransformBlock<Tuple<Symbol, int>, List<Quote>>(syncExecution, new ExecutionDataflowBlockOptions { SingleProducerConstrained = true,  MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

//Function that performs Sync Processing
Func<Tuple<Symbol, int>, List<Quote>> syncExecution = new Func<Tuple<Symbol, int>, List<Quote>>(partitionTuple =>
{
    Symbol symbol = partitionTuple.Item1;
    int partitionIndex = partitionTuple.Item2;

    //Read Binary Data
    byte[] byteArray = binaryDataReaders[symbol].ReadBytes(partitionIndex);

    //Deserialize and return quote list
    List<Quote> quoteList = dataInterfaces[symbol].Deserialize(symbol, byteArray);

    return quoteList;
});

そして、これは私が変換ブロックに投稿する方法です:

quoteBuffer.SendAsync(new Tuple<Symbol, int>(symbol, counter));

元の質問:

誰かが次のカスタム変換ブロックを手伝ってくれました。アイデアは、TInput をポスト/センダシンクし、TInput を非同期的に処理する一方で、カスタム変換ブロックが変換されたアイテムを返すときにポストされたアイテムの順序を保持することです。

たとえば、それぞれの順序で 1、2、3 をポストし、変換関数が各入力を 2 乗してアイテムを返す場合、正しい出力値と順序は 1、4、9 である必要があります。 .

ただし、出力順序が正しくないため、コードにエラーがあると思われます。さらに悪いことに、混乱した順序の場所はランダムであるため、デバッグが難しくなりますが、これは明らかに、入力要素を出力要素に変換するために開始されるタスクが常に異なる方法で完了するという事実を反映しています。

誰かが見て、私がここで見逃しているヒントを教えてもらえますか? ありがとう

public static IPropagatorBlock<TInput, TOutput> CreateConcurrentOrderedTransformBlock<TInput, TOutput>(Func<TInput, TOutput> transform)
    {
        var queue = new TransformBlock<Task<TOutput>, TOutput>(t => t);

        var processor = new ActionBlock<Tuple<TInput, Action<TOutput>>>(
            tuple => tuple.Item2(transform(tuple.Item1)),
            new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
            });

        var enqueuer = new ActionBlock<TInput>(
            async item =>
            {
                var tcs = new TaskCompletionSource<TOutput>();
                await processor.SendAsync(
                    new Tuple<TInput, Action<TOutput>>(item, tcs.SetResult));
                await queue.SendAsync(tcs.Task);
            });

        enqueuer.Completion.ContinueWith(
            _ =>
            {
                queue.Complete();
                processor.Complete();
            });

        return DataflowBlock.Encapsulate(enqueuer, queue);
    }
4

1 に答える 1

0

このすべての問題を引き起こしたバグを見つけたので、私は自分の質問に答えます。ラムダ式からわかるように、データ ブロック内のバイト配列を読み取ります。つまり、並列度が 1 を超えるとすぐに、物理ディスクの同じファイルからバイト配列が同時に読み取られます。これは、バイトが読み取られる場所を実際に混乱させているようです。br.basestream.seek(...) で読み取り操作の開始点を設定し、br.readbytes(numberBytes) を介してバイトを読み取ります。いくつかの操作が同時にファイル内の場所に影響を与えるため、バイナリ リーダーは、混乱の原因となる順不同でバイトを読み取る可能性が最も高くなります。

ラムダ式からバイナリリーダーを引き出し、代わりに読み取りバイト配列を式に渡し、問題を解決する逆シリアル化とマージ/ソートの目的でのみ同時実行性を使用することで問題を解決しました。はい、変換ブロックは順序を保持します。tpl データフロー側で膨大な専門知識を共有してくれた svick に感謝します。

于 2012-10-12T19:07:13.030 に答える