11

パイプライン パターンの実装を使用してメッセージ コンシューマーをプロデューサーから分離し、コンシューマーが遅い問題を回避しています。

メッセージ処理段階[1]で例外が発生した場合、メッセージは失われ、他のサービス/レイヤーにディスパッチされません[2][3]メッセージが失われないように、このような問題をどのように処理できますか。何が重要なのですか? メッセージの順序が混同されないため、上位のサービス/レイヤーはメッセージが入った順序でメッセージを取得します。他の中間体を含むアイデアがありますが、Queue複雑に見えますか? 残念ながら、メソッドBlockingCollection<T>の類似物を公開していないQueue.Peek()ため、次の利用可能なメッセージを読み取ることができ、処理が成功した場合は doDequeue()

private BlockingCollection<IMessage> messagesQueue;    

// TPL Task does following:
// Listen to new messages and as soon as any comes in - process it
foreach (var cachedMessage in 
             messagesQueue.GetConsumingEnumerable(cancellation))
{    
    const int maxRetries = 3;
    int retriesCounter = 0;
    bool isSent = false;

    // On this point a message already is removed from messagesQueue
    while (!isSent && retriesCounter++ <= maxRetries)
    {
        try
        {
           // [1] Preprocess a message
           // [2] Dispatch to an other service/layer    
           clientProxyCallback.SendMessage(cachedMessage);
           isSent = true;
        }                                
        catch(Exception exception)
        {
           // [3]   
           // logging
           if (!isSent && retriesCounter < maxRetries)
           {
              Thread.Sleep(NSeconds);
           }
        }            
    
        if (!isSent && retriesCounter == maxRetries)
        {
           // just log, message is lost on this stage!
        }
    }
}

編集:これは IIS がホストする WCF サービスであり、クライアント コールバック コントラクトを介して Silverlight クライアント WCF プロキシにメッセージをディスパッチすることを忘れていました。

EDIT2:以下は、を使用してこれを行う方法Peek()です。何か不足していますか?

bool successfullySent = true;
try
{
   var item = queue.Peek();
   PreProcessItem(item);
   SendItem(item);       
}
catch(Exception exception)
{
   successfullySent = false;
}
finally
{
   if (successfullySent)
   {
       // just remove already sent item from the queue
       queue.Dequeue();
   }
}

EDIT3:確かに、while ループ、bool フラグ、Queueおよびを使用して古いスタイルのアプローチを使用できますが、使用し AutoResetEventて同じことが可能かどうか疑問に思っています。のような新しいものの例と耐久性がないように見えるので、古いアプローチに戻らなければなりません。BlockingCollectionGetConsumingEnumerable()PeekBlockingCollectionGetConsumingEnumerable()

4

4 に答える 4

9

中間キューを検討する必要があります。

BlockingCollection<T>その性質上、アイテムを「覗く」ことはできません。複数の消費者が存在する可能性があります。そのうちの 1 人はアイテムを覗くことができ、もう 1 人はそれを取ることができます。

于 2012-11-27T11:03:32.237 に答える
4

BlockingCollection<T>これはIProducerConsumerCollection<T>、 eg よりも一般的であり、メソッドConcurrentQueueを実装する必要がないという自由を実装者に与え(Try)Peekます。

ただし、TryPeek基になるキューをいつでも直接呼び出すことができます。

ConcurrentQueue<T> useOnlyForPeeking = new ConcurrentQueue<T>();
BlockingCollection<T> blockingCollection = new BlockingCollection<T>(useOnlyForPeeking);
...
useOnlyForPeeking.TryPeek(...)

ただし、を介してキューを変更してはならないことにuseOnlyForPeeking注意してください。そうしないblockingCollectionと、混乱してsがスローInvalidOperationExceptionTryPeekされる可能性がありますが、この並行データ構造で非変更を呼び出すことが問題になる場合は驚くでしょう。

于 2014-06-06T03:15:45.807 に答える
4

Dennisがコメントで述べているようBlockingCollection<T>に、インターフェイスの実装者にブロッキング ラッパーを提供しIProducerConsumerCollection<T>ます。

ご覧のとおり、 は設計上、実装に必要な やその他のメソッドをIProducerConsumerCollection<T>定義していません。Peek<T>これは、BlockingCollection<T>がそのままでは に類似したものを提供できないことを意味しPeekます。

考えてみれば、これにより、実装のユーティリティのトレードオフによって生じる並行性の問題が大幅に軽減されPeekます。どうすれば消費せずに消費できますか? Peek同時に、操作が完了するまでコレクションの先頭をロックする必要がありますが、これPeekは私と設計者がBlockingCollection<T>最適ではないと考えています。また、ある種の使い捨てのピークコンテキストが必要なため、実装が面倒で難しいと思います。

メッセージを消費し、その消費が失敗した場合は、それを処理する必要があります。それを別の失敗キューに追加したり、将来の再試行のために通常の処理キューに再追加したり、後世のために失敗をログに記録したり、コンテキストに適した他のアクションを実行したりできます。

メッセージを同時に消費したくない場合は、BlockingCollection<T>同時消費が必要ないため、使用する必要はありません。直接使用することもできますConcurrentQueue<T>が、追加から同期性を得ることができTryPeek<T>、単一のコンシューマーを制御するため安全に使用できます。消費が失敗した場合は、必要に応じて無限の再試行ループで消費を停止できますが、これには設計上の考慮が必要であることをお勧めします。

于 2012-11-27T11:31:11.653 に答える
0

ConcurrentQueue<T>代わりに使用できますTryDequeue()。メソッドがあります。

ConcurrentQueue<T>.TryDequeue(out T result)並行キューの先頭にあるオブジェクトを削除して返そうとします。要素が削除され、ConcurrentQueue の先頭から正常に返された場合は true を返します。

そのため、最初に Peek をチェックする必要はありません。

TryDequeue()スレッドセーフです:

ConcurrentQueue<T> すべての同期を内部で処理します。2 つのスレッドがまったく同時に TryDequeue(T) を呼び出した場合、どちらの操作もブロックされません。

私が理解している限り、キューが空の場合にのみ false を返します

キューに q.Enqueue("a"); などのコードが入力された場合。q.Enqueue("b"); q.Enqueue("c"); 2 つのスレッドが同時に要素をデキューしようとすると、1 つのスレッドが a をデキューし、もう 1 つのスレッドが b をデキューします。TryDequeue(T) の両方の呼び出しは true を返します。これは、両方とも要素をデキューできたためです。各スレッドが追加の要素をデキューするために戻った場合、スレッドの 1 つは c をデキューして true を返し、もう 1 つのスレッドはキューが空であることを検出して false を返します。

http://msdn.microsoft.com/en-us/library/dd287208%28v=vs.100%29.aspx

アップデート

TaskSchedulerおそらく、最も簡単なオプションはClassを使用することでしょう。これにより、すべての処理タスクをキューのアイテムにラップし、同期の実装を簡素化できます。

于 2012-11-27T10:56:06.030 に答える