0

最近、Reply-To パターンを Apache NMS /ActiveMQ で動作させようとしており、一時キューの名前のみを使用して一時キューにメッセージを送信する際に問題が発生しています。

このプロジェクトは、バスから要求を取得し、それらを (複雑なルーティング基準に基づいて) 別のプロセス/ランタイムに送信して要求を処理するディスパッチャ サービスです。次に、この別のプロセッサーが応答先キュー名と相関 ID を使用して応答を作成し、それを同じブローカー上で別の接続を介して元のリクエスターに送信します。

問題は、メッセージの NMSReplyTo ヘッダーからの IDestination オブジェクト参照がある場合にのみ、一時キュー (またはトピック) に送信できるように見えることです。その参照が失われると、単にその名前を使用して一時キュー (またはトピック) にメッセージを送信する方法はありません。

この問題を説明するのは、メッセージ キューをリッスンし、NMS Reply-To ヘッダーの内容を使用してリクエスタに応答を発行する単純な「Pong」サービスです。ProcessMessage(string,string) メソッドを呼び出すだけで、別のプロセスへのリクエストのディスパッチを模倣します。

    using System;
    using Apache.NMS;

    namespace PongService
    {
        /// <summary>Simple request dispatcher which mimics dispatching requests to other workers in "The Cloud"</summary>
        class PongService
        {
            static ISession session = null;
            static IMessageProducer producer = null;

            public static void Main(string[] args)
            {
                Uri connecturi = new Uri("activemq:tcp://localhost:61616");
                Console.WriteLine("Connecting to " + connecturi);

                IConnectionFactory factory = new NMSConnectionFactory(connecturi);
                IConnection connection = factory.CreateConnection();
                session = connection.CreateSession();

                IDestination destination = session.GetQueue("PONG.CMD");
                Console.WriteLine("Using destination: " + destination);

                producer = session.CreateProducer(null);

                IMessageConsumer consumer = session.CreateConsumer(destination);

                connection.Start();

                consumer.Listener += new MessageListener(OnMessage);

                Console.WriteLine("Press any key to terminate Pong service . . .");

                // loop until a key is pressed
                while (!Console.KeyAvailable)
                {
                    try { System.Threading.Thread.Sleep(50); }
                    catch (Exception ex) { Console.Error.WriteLine(ex.Message + "\r\n" + ex.StackTrace); }
                } // loop

                Console.Write("Closing connection...");
                consumer.Close();
                producer.Close();
                session.Close();
                connection.Close();
                Console.WriteLine("done.");
            }


            /// <summary>Consumer call-back which receives requests and dispatches them to available workers in 'The Cloud'</summary>
            /// <param name="receivedMsg">The message received on the request queue.</param>
            protected static void OnMessage(IMessage receivedMsg)
            {
                // mimic the operation of passing this request to an external processor which can connect 
                // to the broker but will not have references to the session objects including destinations
                Console.WriteLine("Sending request to an external processor");
                ProcessMessage(receivedMsg.NMSReplyTo.ToString(), receivedMsg.NMSCorrelationID.ToString());
            }


            /// <summary>Models a worker in another process/runtime.</summary>
            /// <param name="queuename">Where to send the results of processing</param>
            /// <param name="crid">Correlation identifier of the request.</param>
            protected static void ProcessMessage(string queuename, string crid)
            {
                ITextMessage response = session.CreateTextMessage("Pong!");
                response.NMSCorrelationID = crid;

                IDestination destination = session.GetQueue(queuename);

                Console.WriteLine("Sending response with CRID of '" + crid + "' to " + queuename + "'");
                try
                {
                    producer.Send(destination, response);
                }
                catch (Exception ex)
                {
                    Console.Error.WriteLine("Could not send response: " + ex.Message);
                }

            }

        }

    }

今、クライアントのために。単純に一時的なキューを作成し、リッスンを開始してから、「Pong」サービスがリッスンしているキューにリクエストを送信します。要求メッセージには、一時キューの IDestation が含まれています。

    using System;
    using System.Threading;
    using Apache.NMS;
    using Apache.NMS.Util;

    namespace PongClient
    {
        class PongClient
        {
            protected static AutoResetEvent semaphore = new AutoResetEvent(false);
            protected static ITextMessage message = null;
            protected static TimeSpan receiveTimeout = TimeSpan.FromSeconds(3);

            public static void Main(string[] args)
            {
                Uri connecturi = new Uri("activemq:tcp://localhost:61616");
                Console.WriteLine("About to connect to " + connecturi);

                IConnectionFactory factory = new NMSConnectionFactory(connecturi);

                IConnection connection = factory.CreateConnection();
                ISession session = connection.CreateSession();

                IDestination temporaryDestination = session.CreateTemporaryQueue();
                Console.WriteLine("Private destination: " + temporaryDestination);

                IDestination destination = session.GetQueue("PONG.CMD");
                Console.WriteLine("Service destination: " + destination);


                IMessageConsumer consumer = session.CreateConsumer(destination);
                consumer.Listener += new MessageListener(OnMessage);

                IMessageProducer producer = session.CreateProducer(destination);

                connection.Start();

                // Send a request message
                ITextMessage request = session.CreateTextMessage("Ping");
                request.NMSCorrelationID = Guid.NewGuid().ToString();
                request.NMSReplyTo = temporaryDestination;
                producer.Send(request);

                // Wait for the message
                semaphore.WaitOne((int)receiveTimeout.TotalMilliseconds, true);
                if (message == null)
                {
                    Console.WriteLine("Timed-Out!");
                }
                else
                {
                    Console.WriteLine("Received message with ID:   " + message.NMSMessageId);
                    Console.WriteLine("Received message with text: " + message.Text);
                }
            }



            protected static void OnMessage(IMessage receivedMsg)
            {
                message = receivedMsg as ITextMessage;
                semaphore.Set();
            }
        }
    }

Pong プロセスは正しく動作しているように見えますが、Reply-To ヘッダーで指定されたキューとは別の完全に新しいキューを作成するだけです。

関連するテクノロジーのバージョンは次のとおりです。

  • Apache.NMS.ActiveMQ v1.5.1
  • Apache.NMS API v1.5.0
  • アクティブMQ 5.5.0
  • C#.NET 3.5

この質問は、同様の問題を説明するこの投稿に関連しています。これらの例が、そのリクエストの問題を明確にするのにも役立つことを願っています。

解決策への助けや洞察をいただければ幸いです。

4

3 に答える 3

1

実際には、PongClient からの要求メッセージに返信先ヘッダーを設定していません。

これを試して:

ITextMessage request = session.CreateTextMessage("Ping");
request.NMSCorrelationID = Guid.NewGuid().ToString();
request.NMSReplyTo = temporaryDestination;
producer.Send(request);
于 2011-09-07T15:37:28.857 に答える
0

返信先としてトピックを使用し、NMSCorrelationID に基づいてコンシューマー フィルターを設定することをお勧めします。これは、一時キューに多くの不満を感じた後、私が移行した実装です。実際には多くの利点があります。

  1. サーバーでの集中的なリソース使用を削減します (一時キューを構築/分解する必要はありません)。
  2. 別のコンシューマーを使用して、返された応答を監視できます (一時キュー内を「覗く」ことはできません)。
  3. また、特定のトークン ID (接続間で失われている) の代わりに論理名を介してトピックを渡すことができるため、はるかに信頼性が高くなります。
于 2013-04-06T00:34:23.760 に答える
0

渡されたものを使用する必要がありIDestinationます。

通話中

IDestination destination = session.GetQueue(queuename); 

は少し悪いです。内部では、CreateTemporaryQueue() を呼び出し、既存の一時キューを同じ名前の新しい一時キューに置き換えます。

于 2012-02-20T13:55:05.563 に答える