パイプライン パターンの実装を使用してメッセージ コンシューマーをプロデューサーから分離し、コンシューマーが遅い問題を回避しています。
メッセージ処理段階[1]
で例外が発生した場合、メッセージは失われ、他のサービス/レイヤーにディスパッチされません[2]
。[3]
メッセージが失われないように、このような問題をどのように処理できますか。何が重要なのですか? メッセージの順序が混同されないため、上位のサービス/レイヤーはメッセージが入った順序でメッセージを取得します。他の中間体を含むアイデアがありますが、Queue
複雑に見えますか? 残念ながら、メソッドBlockingCollection<T>
の類似物を公開していないQueue.Peek()
ため、次の利用可能なメッセージを読み取ることができ、処理が成功した場合は doDequeue()
private BlockingCollection<IMessage> messagesQueue;
// TPL Task does following:
// Listen to new messages and as soon as any comes in - process it
foreach (var cachedMessage in
messagesQueue.GetConsumingEnumerable(cancellation))
{
const int maxRetries = 3;
int retriesCounter = 0;
bool isSent = false;
// On this point a message already is removed from messagesQueue
while (!isSent && retriesCounter++ <= maxRetries)
{
try
{
// [1] Preprocess a message
// [2] Dispatch to an other service/layer
clientProxyCallback.SendMessage(cachedMessage);
isSent = true;
}
catch(Exception exception)
{
// [3]
// logging
if (!isSent && retriesCounter < maxRetries)
{
Thread.Sleep(NSeconds);
}
}
if (!isSent && retriesCounter == maxRetries)
{
// just log, message is lost on this stage!
}
}
}
編集:これは IIS がホストする WCF サービスであり、クライアント コールバック コントラクトを介して Silverlight クライアント WCF プロキシにメッセージをディスパッチすることを忘れていました。
EDIT2:以下は、を使用してこれを行う方法Peek()
です。何か不足していますか?
bool successfullySent = true;
try
{
var item = queue.Peek();
PreProcessItem(item);
SendItem(item);
}
catch(Exception exception)
{
successfullySent = false;
}
finally
{
if (successfullySent)
{
// just remove already sent item from the queue
queue.Dequeue();
}
}
EDIT3:確かに、while ループ、bool フラグ、Queue
およびを使用して古いスタイルのアプローチを使用できますが、使用し AutoResetEvent
て同じことが可能かどうか疑問に思っています。のような新しいものの例と耐久性がないように見えるので、古いアプローチに戻らなければなりません。BlockingCollection
GetConsumingEnumerable()
Peek
BlockingCollection
GetConsumingEnumerable()