0

2つ質問があります。以下はシナリオです -

プロセス A とプロセス B の 2 つの異なるプロセスがあります。プロセス A は、メッセージをメッセージ キューにエンキューします。プロセス B は、メッセージ キューからメッセージをデキューします。

1) プロセス B はしばらくシャットダウンしますが、プロセス A はメッセージをキューに入れ続けます。プロセス B がライブに戻ったとき、プロセス B がオフラインだったときにプロセス A によってポストされたメッセージ キュー内のメッセージをデキューする方法は?

2) メッセージをデキューするには複数のプロセス B が必要なため、使用しているキューは複数のコンシューマ キューです。設計の背後にある理由は、プロセス B の 1 つが停止した場合でも、他のプロセス B が引き続きメッセージを処理できるためです。同時に、プロセス B の 1 つのインスタンスがメッセージを取得した場合、そのメッセージを処理しないように他のプロセス B に通知する必要があります。

サンプルが見つかりませんでした。どんな助けでも大歓迎です。

4

1 に答える 1

0

かなり似た要件を持つプロジェクトを完了しました。

問題 1) WCF Restful サービスを呼び出して定期的に実行する Windows サービス タイマーを作成しました。次に、WCF サービスは、キューに入れられたものをすべてデキューします (呼び出しごとに最大 500 メッセージ)。エンキューされたものはすべて自動的に順番に処理される必要があるため、このタイマーが再起動されて停止した場合でも、中断したところから再開されます。

問題 2) Oracle から CouchBase にデータをレプリケートしていたので、プロセスが開始されたときの取得用のタイムスタンプと、CouchBase に既に保存されているデータのタイムスタンプがありました。最初のデータが後者より古い場合、保存されません。(これは競合状態を処理するためでした)。

Oracleには、何かがキューに入れられたときにIDとキューに入れられた時間を2番目のテーブルにコピーするトリガーもありました。この 2 番目のテーブルは定期的にチェックされ、アイテムがキュー テーブル内でデキューされたが、WCF サービスによって特定の時間枠内にこれを反映するように 2 番目のテーブルが更新されていない場合、プロセスで何かが失敗したため、データが再度キューに入れられます。 .

役立つ場合は、odp.net を使用した wcf restful サービスの例を次に示します。

OracleAQQueue _queueObj;
OracleConnection _connObj;
_connString = ConfigurationManager.ConnectionStrings["connectionstring"].ToString();
_connObj = new OracleConnection(_connString);
_queueObj = new OracleAQQueue("QUEUENAME", _connObj);
_connObj.Open();

  int i = 0;
  bool messageAvailable = true;

  while (messageAvailable && i < 500)
  {
    OracleTransaction _txn = _connObj.BeginTransaction();
    //Makes dequeue part of transaction
    _queueObj.DequeueOptions.Visibility = OracleAQVisibilityMode.OnCommit;
    _queueObj.DequeueOptions.ConsumerName = "CONSUMERNAME"
    try
    {
         //Wait  number of seconds for dequeue, default is forever
         _queueObj.DequeueOptions.Wait = 2;
         _queueObj.MessageType = OracleAQMessageType.Raw;
         _queueObj.DequeueOptions.ProviderSpecificType = true;
         OracleAQMessage _depMsq = _queueObj.Dequeue();
         var _binary = (OracleBinary)_depMsq.Payload;
         byte[] byteArray = _binary.Value;
         _txn.Commit();
     }
     catch (Exception ex)
     {
         //This catch will always fire when all messages have been dequeued
         messageAvailable = false;
         if (ex.Message.IndexOf("end-of-fetch during message dequeue") == -1)
            {
             //Actual error present. 
             log.Info("Problem occurred during dequeue process : " + ex.Message);
            }
     }
  }

    _queueObj.Dispose();
    _connObj.Close();
    _connObj.Dispose();
    _connObj = null;
于 2013-01-31T15:51:11.887 に答える