30

両方のトランスフォームブロックが完了したときにコードが完了するコードをどのように書き直すことができますか?完了とは、完了とマークされ、「アウトキュー」が空であることを意味すると思いましたか?

public Test()
    {
        broadCastBlock = new BroadcastBlock<int>(i =>
            {
                return i;
            });

        transformBlock1 = new TransformBlock<int, string>(i =>
            {
                Console.WriteLine("1 input count: " + transformBlock1.InputCount);
                Thread.Sleep(50);
                return ("1_" + i);
            });

        transformBlock2 = new TransformBlock<int, string>(i =>
            {
                Console.WriteLine("2 input count: " + transformBlock1.InputCount);
                Thread.Sleep(20);
                return ("2_" + i);
            });

        processorBlock = new ActionBlock<string>(i =>
            {
                Console.WriteLine(i);
            });

        //Linking
        broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
        broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
        transformBlock1.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
        transformBlock2.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
    }

    public void Start()
    {
        const int numElements = 100;

        for (int i = 1; i <= numElements; i++)
        {
            broadCastBlock.SendAsync(i);
        }

        //mark completion
        broadCastBlock.Complete();

        processorBlock.Completion.Wait();

        Console.WriteLine("Finished");
        Console.ReadLine();
    }
}

コードを編集し、各変換ブロックの入力バッファー数を追加しました。明らかに、100個すべてのアイテムが各変換ブロックにストリーミングされます。ただし、トランスフォームブロックの1つが終了するとすぐに、プロセッサブロックはそれ以上のアイテムを受け入れず、代わりに不完全なトランスフォームブロックの入力バッファが入力バッファをフラッシュするだけです。

4

5 に答える 5

36

問題はまさにcasperOneが彼の答えで言ったことです。最初の変換ブロックが完了すると、プロセッサブロックは「終了モード」になります。入力キュー内の残りのアイテムを処理しますが、新しいアイテムは受け入れません。

ただし、プロセッサブロックを2つに分割するよりも簡単な修正があります。設定しないでください。PropagateCompletion代わりに、両方の変換ブロックが完了したときにプロセッサブロックの完了を手動で設定してください。

Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion)
    .ContinueWith(_ => processorBlock.Complete());
于 2012-11-23T21:52:49.373 に答える
30

ここでの問題は、ブロックをリンクするメソッドを呼び出すたびにPropagateCompletionプロパティを設定していることと、変換ブロックの待機時間が異なることです。LinkTo

インターフェイス(強調鉱山)のCompleteメソッドのドキュメントから:IDataflowBlock

IDataflowBlockに、これ以上メッセージを受け入れたり生成したり、延期されたメッセージを消費したりしてはならないことを通知します。

TransformBlock<TInput, TOutput>各インスタンスで待機時間をずらすため、 transformBlock2(20ミリ秒待機)はtransformBlock1(50ミリ秒待機)前に終了します。 最初に完了してから、「他に何も受け入れていません」transformBlock2というシグナルを送信します(まだすべてのメッセージを生成していません)。processorBlocktransformBlock1

transformBlock1beforeの処理は完全に保証されtransformBlock1ているわけではないことに注意してください。スレッドプール(デフォルトのスケジューラーを使用していると仮定)がタスクを異なる順序で処理することは可能です(ただし、20ミリ秒のアイテムが完了するとキューから作業を盗むため、おそらくそうではありません)。

パイプラインは次のようになります。

           broadcastBlock
          /              \
 transformBlock1   transformBlock2
          \              /
           processorBlock

これを回避するには、次のようなパイプラインが必要です。

           broadcastBlock
          /              \
 transformBlock1   transformBlock2
          |              |
 processorBlock1   processorBlock2

ActionBlock<TInput>これは、次のように2つの別々のインスタンスを作成するだけで実現されます。

// The action, can be a method, makes it easier to share.
Action<string> a = i => Console.WriteLine(i);

// Create the processor blocks.
processorBlock1 = new ActionBlock<string>(a);
processorBlock2 = new ActionBlock<string>(a);


// Linking
broadCastBlock.LinkTo(transformBlock1, 
    new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(transformBlock2, 
    new DataflowLinkOptions { PropagateCompletion = true });
transformBlock1.LinkTo(processorBlock1, 
    new DataflowLinkOptions { PropagateCompletion = true });
transformBlock2.LinkTo(processorBlock2, 
    new DataflowLinkOptions { PropagateCompletion = true });

次に、1つだけではなく、両方のプロセッサブロックで待機する必要があります。

Task.WhenAll(processorBlock1.Completion, processorBlock2.Completion).Wait();

ここで非常に重要な注意事項。を作成するActionBlock<TInput>場合、デフォルトでは、渡されるインスタンスのMaxDegreeOfParallelismプロパティが1に設定されます。ExecutionDataflowBlockOptions

これは、渡すAction<T>デリゲートActionBlock<TInput>への呼び出しがスレッドセーフであり、一度に1つだけが実行されることを意味します。

同じデリゲートを指す2つの インスタンスがあるため、スレッドセーフは保証されません。ActionBlock<TInput>Action<T>

メソッドがスレッドセーフである場合は、何もする必要はありません(ブロックする理由がないため、MaxDegreeOfParallelismプロパティをに設定できます)。DataflowBlockOptions.Unbounded

スレッドセーフではなく、それを保証する必要がある場合は、lockステートメントのような従来の同期プリミティブに頼る必要があります。

この場合、そのようにします(ただし、クラスのWriteLineメソッドはスレッドセーフであるため、明らかに必要ありません)。Console

// The lock.
var l = new object();

// The action, can be a method, makes it easier to share.
Action<string> a = i => {
    // Ensure one call at a time.
    lock (l) Console.WriteLine(i);
};

// And so on...
于 2012-11-23T17:07:00.107 に答える
9

svickの答えに加えて、PropagateCompletionオプションで得られる動作と一貫性を保つために、前のブロックに障害が発生した場合に備えて例外を転送する必要もあります。次のような拡張メソッドもそれを処理します。

public static void CompleteWhenAll(this IDataflowBlock target, params IDataflowBlock[] sources) {
    if (target == null) return;
    if (sources.Length == 0) { target.Complete(); return; }
    Task.Factory.ContinueWhenAll(
        sources.Select(b => b.Completion).ToArray(),
        tasks => {
            var exceptions = (from t in tasks where t.IsFaulted select t.Exception).ToList();
            if (exceptions.Count != 0) {
                target.Fault(new AggregateException(exceptions));
            } else {
                target.Complete();
            }
        }
    );
}
于 2013-04-09T09:49:52.160 に答える
1

ブロックに3つ以上のソースがある場合に、PropagateCompletion = trueが混乱する理由については、他の回答も非常に明確です。

この問題の簡単な解決策を提供するために、よりスマートな完了ルールが組み込まれているこの種の問題を解決するオープンソースライブラリDataflowExを調べることをお勧めします。(内部でTPL Dataflowリンクを使用しますが、複雑な完了の伝播をサポートします。実装はWhenAllに似ていますが、動的リンクの追加も処理します。実装の詳細については、 Dataflow.RegisterDependency()およびTaskEx.AwaitableWhenAll()を確認してください。)

DataflowExを使用してすべてが機能するように、コードを少し変更しました。

public CompletionDemo1()
{
    broadCaster = new BroadcastBlock<int>(
        i =>
            {
                return i;
            }).ToDataflow();

    transformBlock1 = new TransformBlock<int, string>(
        i =>
            {
                Console.WriteLine("1 input count: " + transformBlock1.InputCount);
                Thread.Sleep(50);
                return ("1_" + i);
            });

    transformBlock2 = new TransformBlock<int, string>(
        i =>
            {
                Console.WriteLine("2 input count: " + transformBlock2.InputCount);
                Thread.Sleep(20);
                return ("2_" + i);
            });

    processor = new ActionBlock<string>(
        i =>
            {
                Console.WriteLine(i);
            }).ToDataflow();

    /** rather than TPL linking
      broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
      broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
      transformBlock1.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
      transformBlock2.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
     **/

    //Use DataflowEx linking
    var transform1 = transformBlock1.ToDataflow();
    var transform2 = transformBlock2.ToDataflow();

    broadCaster.LinkTo(transform1);
    broadCaster.LinkTo(transform2);
    transform1.LinkTo(processor);
    transform2.LinkTo(processor);
}

完全なコードはここにあります。

免責事項:私は、MITライセンスの下で公開されているDataflowExの作成者です。

于 2014-12-19T09:01:19.657 に答える
1

これは、pktのCompleteWhenAllメソッドと機能的に同等ですが、コードがわずかに少ないメソッドです。

public static async void PropagateCompletion(IDataflowBlock[] sources,
    IDataflowBlock target)
{
    // Arguments validation omitted
    Task allSourcesCompletion = Task.WhenAll(sources.Select(s => s.Completion));

    try { await allSourcesCompletion.ConfigureAwait(false); } catch { }

    var exception = allSourcesCompletion.IsFaulted ?
        allSourcesCompletion.Exception : null;

    if (exception != null) target.Fault(exception); else target.Complete();
}

使用例:

PropagateCompletion(new[] { transformBlock1, transformBlock2 }, processorBlock);

このPropagateCompletionメソッドは、私がここに投稿した、同じ名前のより一般的なメソッドの変形です。

于 2020-05-06T15:30:12.790 に答える