5

次の場合:

BufferBlock<int> sourceBlock = new BufferBlock<int>();
TransformBlock<int, int> targetBlock = new TransformBlock<int, int>(element =>
{
    return element * 2;
});

sourceBlock.LinkTo(targetBlock, new DataflowLinkOptions { PropagateCompletion = true });

//feed some elements into the buffer block
for(int i = 1; i <= 1000000; i++)
{
    sourceBlock.SendAsync(i);
}

sourceBlock.Complete();

targetBlock.Completion.ContinueWith(_ =>
{
    //notify completion of the target block
});

targetBlock完了していないようです。その理由は、を他のDataflowブロックにTransformBlock targetBlockリンクしていないため、のすべてのアイテムが出力キューで待機しているためだと思います。targetBlockしかし、私が実際に達成したいのは、(A)targetBlock完了が通知され、(B)入力キューが空になったときの通知です。アイテムがまだの出力キューにあるかどうかは気にしたくありませんTransformBlock。どうすればそれについて行くことができますか?sourceBlockANDの完了ステータスを照会してInputCount、のtargetBlockがゼロであることを確認したいものを取得する唯一の方法はありますか?これが非常に安定しているかどうかはわかりません(の最後のアイテムがに渡されたsourceBlock場合にのみ、本当に完了とマークされますsourceBlocktargetBlock?)。同じ目標を達成するためのよりエレガントで効率的な方法はありますか?

sourceBlock編集: ANDがゼロInputCountであることの完了をチェックする「汚い」方法でさえ、実装するのは簡単ではないことに気づきました。targetBlockそのブロックはどこにありますか?targetBlock上記の2つの条件が満たされると、メッセージは明らかに処理されなくなるため、内に含めることはできませtargetBlockん。また、導入の完了状況を確認するとsourceBlock、多くの非効率性が生じます。

4

3 に答える 3

1

直接これを行うことはできないと思います。リフレクションを使用して一部のフィールドからこの情報を取得できる可能性はありますがprivate、そうすることはお勧めしません。

ただし、カスタムブロックを作成することでこれを行うことができます。単純な場合Complete():各メソッドを元のブロックに転送するブロックを作成するだけです。ただしComplete()、ログに記録される場合を除きます。

すべてのアイテムの処理がいつ完了したかを把握する場合は、ブロックを中間にリンクできますBufferBlock。このように、出力キューはすぐに空になります。したがってCompleted、内部ブロックをチェックすると、処理がいつ完了したかをかなり正確に測定できます。これは測定に影響しますが、それほど大きくないことを願っています。

別のオプションは、ブロックのデリゲートの最後にログを追加することです。このようにして、最後のアイテムの処理がいつ終了したかを確認できます。

于 2012-11-28T17:13:25.840 に答える
1

TransformBlockブロックがキュー内のすべてのメッセージの処理を完了したときに発生するProcessingCompletedイベントがあれば便利ですが、そのようなイベントはありません。以下は、この欠落を修正する試みです。このメソッドは、この「イベント」が発生したときに呼び出されるハンドラーをCreateTransformBlockEx受け入れます。Action<Exception>

ブロックが最終的に完了する前に、常にハンドラーを呼び出すことが目的でした。残念ながら、提供されたCancellationTokenものがキャンセルされた場合、最初に完了(キャンセル)が発生し、数ミリ秒後にハンドラーが呼び出されます。この不整合を修正するには、いくつかのトリッキーな回避策が必要であり、他の望ましくない副作用が発生する可能性があるため、そのままにしておきます。

public static IPropagatorBlock<TInput, TOutput>
    CreateTransformBlockEx<TInput, TOutput>(Func<TInput, Task<TOutput>> transform,
    Action<Exception> onProcessingCompleted,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
    if (onProcessingCompleted == null)
        throw new ArgumentNullException(nameof(onProcessingCompleted));
    dataflowBlockOptions = dataflowBlockOptions ?? new ExecutionDataflowBlockOptions();

    var transformBlock = new TransformBlock<TInput, TOutput>(transform,
        dataflowBlockOptions);
    var bufferBlock = new BufferBlock<TOutput>(dataflowBlockOptions);

    transformBlock.LinkTo(bufferBlock);
    PropagateCompletion(transformBlock, bufferBlock, onProcessingCompleted);
    return DataflowBlock.Encapsulate(transformBlock, bufferBlock);

    async void PropagateCompletion(IDataflowBlock block1, IDataflowBlock block2,
        Action<Exception> completionHandler)
    {
        try
        {
            await block1.Completion.ConfigureAwait(false);
        }
        catch { }
        var exception = 
            block1.Completion.IsFaulted ? block1.Completion.Exception : null;
        try
        {
            // Invoke the handler before completing the second block
            completionHandler(exception);
        }
        finally
        {
            if (exception != null) block2.Fault(exception); else block2.Complete();
        }
    }
}

// Overload with synchronous lambda
public static IPropagatorBlock<TInput, TOutput>
    CreateTransformBlockEx<TInput, TOutput>(Func<TInput, TOutput> transform,
    Action<Exception> onProcessingCompleted,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
    return CreateTransformBlockEx<TInput, TOutput>(
        x => Task.FromResult(transform(x)), onProcessingCompleted,
        dataflowBlockOptions);
}

ローカル関数のコードは、オプションを指定して呼び出された場合、組み込みメソッドのソースコードPropagateCompletion模倣します。LinkToPropagateCompletion = true

使用例:

var httpClient = new HttpClient();
var downloader = CreateTransformBlockEx<string, string>(async url =>
{
    return await httpClient.GetStringAsync(url);
}, onProcessingCompleted: ex =>
{
    Console.WriteLine($"Download completed {(ex == null ? "OK" : "Error")}");
}, new ExecutionDataflowBlockOptions()
{
    MaxDegreeOfParallelism = 10
});
于 2020-06-08T11:01:27.680 に答える
0

まず、IPropagatorブロックをリーフターミナルとして使用するのは正しくありません。ただし、TargetBlockの出力バッファーで出力メッセージを非同期的にチェックし、それを消費してバッファーを空にすることで、要件を満たすことができます。

    `  BufferBlock<int> sourceBlock = new BufferBlock<int>();
       TransformBlock<int, int> targetBlock = new TransformBlock<int, int> 
       (element =>
       {

        return element * 2;
        });
        sourceBlock.LinkTo(targetBlock, new DataflowLinkOptions { 
        PropagateCompletion = true });

        //feed some elements into the buffer block
        for (int i = 1; i <= 100; i++)
        {
             sourceBlock.SendAsync(i);
        }

        sourceBlock.Complete();

        bool isOutputAvailable = await targetBlock.OutputAvailableAsync();
        while(isOutputAvailable)
        {
            int value = await targetBlock.ReceiveAsync();

            isOutputAvailable = await targetBlock.OutputAvailableAsync();
        }


        await targetBlock.Completion.ContinueWith(_ =>
        {
            Console.WriteLine("Target Block Completed");//notify completion of the target block
        });

`

于 2020-01-01T08:23:04.183 に答える