54

Teaser : 皆さん、この質問は再試行ポリシーを実装する方法に関するものではありません。これは、TPL Dataflow ブロックの正しい完了に関するものです。

この質問は、以前の質問ITargetBlock 内の再試行ポリシーの続きです。TransformBlockこの質問に対する答えは、 (ソース) とTransformManyBlock(ターゲット)を利用する @svick のスマートなソリューションでした。残っている唯一の問題は、このブロックを正しい方法で完了することです。最初にすべての再試行が完了するのを待ってから、ターゲット ブロックを完了します。これが私が最終的に得たものです(これは単なるスニペットです。スレッドセーフではないretriesセットにはあまり注意を払わないでください):

var retries = new HashSet<RetryingMessage<TInput>>();

TransformManyBlock<RetryableMessage<TInput>, TOutput> target = null;
target = new TransformManyBlock<RetryableMessage<TInput>, TOutput>(
    async message =>
    {
        try
        {
            var result = new[] { await transform(message.Data) };
            retries.Remove(message);
            return result;
        }
        catch (Exception ex)
        {
            message.Exceptions.Add(ex);
            if (message.RetriesRemaining == 0)
            {
                if (failureHandler != null)
                    failureHandler(message.Exceptions);

                retries.Remove(message);
            }
            else
            {
                retries.Add(message);
                message.RetriesRemaining--;

                Task.Delay(retryDelay)
                    .ContinueWith(_ => target.Post(message));
            }
            return null;
        }
    }, dataflowBlockOptions);

source.LinkTo(target);

source.Completion.ContinueWith(async _ =>
{
    while (target.InputCount > 0 || retries.Any())
        await Task.Delay(100);

    target.Complete();
});

アイデアは、何らかのポーリングを実行し、処理を待機しているメッセージがまだあるかどうか、および再試行が必要なメッセージがないかどうかを確認することです。しかし、このソリューションでは、ポーリングのアイデアは好きではありません。

はい、再試行の追加/削除のロジックを別のクラスにカプセル化できます。たとえば、再試行のセットが空になったときに何らかのアクションを実行することもできますが、target.InputCount > 0条件を処理するにはどうすればよいですか? ブロックに保留中のメッセージがない場合に呼び出されるようなコールバックは存在しないため、target.ItemCount遅延の少ないループで検証するしかないようです。

これを達成するためのよりスマートな方法を知っている人はいますか?

4

2 に答える 2

2

おそらく、ManualResetEventがそのトリックを実行できます。

にパブリック プロパティを追加します。TransformManyBlock

private ManualResetEvent _signal  = new ManualResetEvent(false);
public ManualResetEvent Signal { get { return _signal; } }

そして、ここに行きます:

var retries = new HashSet<RetryingMessage<TInput>>();

TransformManyBlock<RetryableMessage<TInput>, TOutput> target = null;
target = new TransformManyBlock<RetryableMessage<TInput>, TOutput>(
    async message =>
    {
        try
        {
            var result = new[] { await transform(message.Data) };
            retries.Remove(message);

            // Sets the state of the event to signaled, allowing one or more waiting threads to proceed
            if(!retries.Any()) Signal.Set(); 
            return result;
        }
        catch (Exception ex)
        {
            message.Exceptions.Add(ex);
            if (message.RetriesRemaining == 0)
            {
                if (failureHandler != null)
                    failureHandler(message.Exceptions);

                retries.Remove(message);

                // Sets the state of the event to signaled, allowing one or more waiting threads to proceed
                if(!retries.Any()) Signal.Set(); 
            }
            else
            {
                retries.Add(message);
                message.RetriesRemaining--;

                Task.Delay(retryDelay)
                    .ContinueWith(_ => target.Post(message));
            }
            return null;
        }
    }, dataflowBlockOptions);

source.LinkTo(target);

source.Completion.ContinueWith(async _ =>
{
    //Blocks the current thread until the current WaitHandle receives a signal.
    target.Signal.WaitOne();

    target.Complete();
});

どこにtarget.InputCount設定されているかわかりません。したがって、変更した場所target.InputCountに次のコードを追加できます。

if(InputCount == 0)  Signal.Set();
于 2013-10-07T07:31:53.670 に答える
1

hwcverwe の回答と JamieSee のコメントを組み合わせることは、理想的な解決策になる可能性があります。

まず、複数のイベントを作成する必要があります。

var signal  = new ManualResetEvent(false);
var completedEvent = new ManualResetEvent(false);

次に、オブザーバーを作成し、 にサブスクライブする必要があります。これによりTransformManyBlock、関連するイベントが発生したときに通知されます。

var observer = new RetryingBlockObserver<TOutput>(completedEvent);
var observable = target.AsObservable();
observable.Subscribe(observer);

オブザーバブルは非常に簡単です。

private class RetryingBlockObserver<T> : IObserver<T> {
        private ManualResetEvent completedEvent;

        public RetryingBlockObserver(ManualResetEvent completedEvent) {                
            this.completedEvent = completedEvent;
        }

        public void OnCompleted() {
            completedEvent.Set();
        }

        public void OnError(Exception error) {
            //TODO
        }

        public void OnNext(T value) {
            //TODO
        }
    }

そして、シグナルまたは完了 (すべてのソース項目の枯渇)、またはその両方を待つことができます。

 source.Completion.ContinueWith(async _ => {

            WaitHandle.WaitAll(completedEvent, signal);
            // Or WaitHandle.WaitAny, depending on your needs!

            target.Complete();
        });

WaitAll の結果値を調べて、どのイベントが設定されたかを理解し、それに応じて対応することができます。必要に応じて設定できるように、他のイベントをコードに追加してオブザーバーに渡すこともできます。たとえば、エラーが発生したときに、動作を区別し、異なる対応をすることができます。

于 2014-09-03T12:28:49.660 に答える