8

1.) .net クライアントから、クライアントがサーバーに接続されている (つまり、送受信できる) かどうかをテストするにはどうすればよいですか?よりエレガントなソリューション。

2) 接続を開いたり、閉じたり、再度開いたりするにはどうすればよいですか? 上記の質問 1 を解決しようとして、接続を開いてから connection.Close() を呼び出すと、接続ファクトリから別の接続を取得できないことがわかりました (以下のコード フラグメントを参照)。エラー メッセージ XMSCC0008 が表示されます

非常に標準的なバニラ MQ 構成を使用しています。クライアントの接続方法は次のとおりです。

ISession session = MQAccess.GetSession(MQAccess.Connection);
IDestination destination = session.CreateTopic(SubTopicName);
Consumer = MQAccess.GetConsumer(session, destination);
Consumer.MessageListener = new MessageListener(HandleMQSubEvent);
MQAccess.Connection.Start();

ここで、MQAccess は小さなユーティリティ クラスです。

質問を編集して、MQAccess コードを追加しました。

public static class MQAccess
{
    public static readonly MQConfigurationSectionHandler ConfigSettings;
    public static readonly IConnectionFactory ConnectionFactory;

    private static readonly IConnection connection;
    public static IConnection Connection
    {
        get { return connection; }
    }

    static MQAccess()
    {
        ConfigSettings = (MQConfigurationSectionHandler)
            ConfigurationManager.GetSection("mq-configuration");

        XMSFactoryFactory factory = XMSFactoryFactory.GetInstance(XMSC.CT_WMQ);
        ConnectionFactory = factory.CreateConnectionFactory();
        ConnectionFactory.SetStringProperty(XMSC.WMQ_HOST_NAME, ConfigSettings.Hostname);
        ConnectionFactory.SetIntProperty(XMSC.WMQ_PORT, ConfigSettings.Port);
        ConnectionFactory.SetStringProperty(XMSC.WMQ_CHANNEL, ConfigSettings.Channel);

        if (ConfigSettings.QueueManager == string.Empty)
        {
            ConnectionFactory.SetStringProperty(XMSC.WMQ_QUEUE_MANAGER, "");
        }
        else
        {
            ConnectionFactory.SetStringProperty(XMSC.WMQ_QUEUE_MANAGER, ConfigSettings.QueueManager);
        }

        connection = GetConnection();
    }

    public static IConnection GetConnection()
    {
        return ConnectionFactory.CreateConnection();
    }

    public static ISession GetSession(IConnection connection)
    {
        return connection.CreateSession(false, AcknowledgeMode.AutoAcknowledge);
    }

    public static IMessageProducer GetProducer(ISession session, IDestination destination)
    {
        return session.CreateProducer(destination);
    }

    public static IMessageConsumer GetConsumer(ISession session, IDestination destination)
    {
        return session.CreateConsumer(destination);
    }

    public static void MQPub(string TopicURI, string message)
    {
        using (var session = GetSession(Connection))
        {
            using (var destination = session.CreateTopic(TopicURI))
            {
                using (var producer = GetProducer(session, destination))
                {
                    producer.Send(session.CreateTextMessage(message));
                }
            }
        }
    }

    public static void MQPub(string TopicURI, IEnumerable<string> messages)
    {
        using (var session = GetSession(Connection))
        {
            using (var destination = session.CreateTopic(TopicURI))
            {
                using (var producer = GetProducer(session, destination))
                {
                    foreach (var message in messages)
                    {
                        producer.Send(session.CreateTextMessage(message));
                    }
                }
            }
        }
    }
}

編集: MQAccess クラスの名前を MQClient に変更しました。T Rob の提案に従ってインスタンス クラスにしました。上記のエラー メッセージで切断メソッドが引き続きクラッシュする

public class MQClient : IDisposable
{
    public MQConfigurationSectionHandler ConfigSettings { get; private set; }
    public IConnectionFactory ConnectionFactory { get; private set; }

    public IConnection Connection { get; private set;  }

    public IMessageConsumer Consumer { get; private set; }
    public IMessageProducer Producer { get; private set; }
    // Save sessions as fields for disposing and future subscription functionality
    private ISession ProducerSession;
    private ISession ConsumerSession;
    public string SubTopicName { get; private set; }
    public string PubTopicName { get; private set; }
    public bool IsConnected { get; private set; }
    public event Action<Exception> ConnectionError;
    private Action<IMessage> IncomingMessageHandler;

    public MQClient(string subTopicName, string pubTopicName, Action<IMessage> incomingMessageHandler)
    {
        // Dont put connect logic in the constructor.  If we lose the connection we may need to connect again.
        SubTopicName = subTopicName;
        PubTopicName = pubTopicName;
        IncomingMessageHandler = incomingMessageHandler;
    }

    public string Connect()
    {
        IsConnected = false;
        string errorMsg = string.Empty;

        ConfigSettings = (MQConfigurationSectionHandler)
                ConfigurationManager.GetSection("mq-configuration");

        XMSFactoryFactory factory = XMSFactoryFactory.GetInstance(XMSC.CT_WMQ);
        ConnectionFactory = factory.CreateConnectionFactory();
        ConnectionFactory.SetStringProperty(XMSC.WMQ_HOST_NAME, ConfigSettings.Hostname);
        ConnectionFactory.SetIntProperty(XMSC.WMQ_PORT, ConfigSettings.Port);
        ConnectionFactory.SetStringProperty(XMSC.WMQ_CHANNEL, ConfigSettings.Channel);

        if (ConfigSettings.QueueManager == string.Empty)
            ConnectionFactory.SetStringProperty(XMSC.WMQ_QUEUE_MANAGER, "");
        else
            ConnectionFactory.SetStringProperty(XMSC.WMQ_QUEUE_MANAGER, ConfigSettings.QueueManager);

        Connection = ConnectionFactory.CreateConnection();


        if (!string.IsNullOrEmpty(PubTopicName))
        {
            ProducerSession = Connection.CreateSession(false, AcknowledgeMode.AutoAcknowledge);
            Producer = ProducerSession.CreateProducer(ProducerSession.CreateTopic(PubTopicName));
        }

        if (!string.IsNullOrEmpty(SubTopicName) && IncomingMessageHandler != null)
        {
            ConsumerSession = Connection.CreateSession(false, AcknowledgeMode.AutoAcknowledge);
            Consumer = ConsumerSession.CreateConsumer(ConsumerSession.CreateTopic(SubTopicName));
            Consumer.MessageListener = new MessageListener(IncomingMessageHandler);
        }

        try
        {
            Connection.Start();
            Connection.ExceptionListener = new ExceptionListener(ConnectionExceptionHandler);
            IsConnected = true;
        }
        catch (TypeInitializationException ex)
        {
            errorMsg = "A TypeInitializationException error occured while attempting to connect to MQ.  Check the Queue configuration in App.config. The error message is: " + ex.Message; 
        }
        catch (IllegalStateException ex)
        {
            errorMsg = "An IllegalStateException error occured while attempting to connect to MQ.  Check the Queue configuration in App.config. The error message is: " + ex.Message; 
        }

        return errorMsg;
    }

    public void Disconnect()
    {
        if (Producer != null)
        {
            Producer.Close();
            Producer.Dispose();
            Producer = null;
        }

        if (ProducerSession != null)
        {
            // Call Unsubscribe here if subscription is durable

            ProducerSession.Close();
            ProducerSession.Dispose();
            ProducerSession = null;
        }

        if (Connection != null)
        {
            Connection.Stop();

            //if (Connection.ExceptionListener != null)
            //    Connection.ExceptionListener = null;

            // Per Shashi............
            //if (Consumer.MessageListener != null)
            //    Consumer.MessageListener = null;

            Connection.Close();
            Connection.Dispose();
            Connection = null;
        }

        if (Consumer != null)
        {

            if (Consumer.MessageListener != null)
                Consumer.MessageListener = null;

            Consumer.Close();
            Consumer.Dispose();
            Consumer = null;
        }


        if (ConsumerSession != null)
        {
            // Call Unsubscribe here if subscription is durable
            ConsumerSession.Close();
            ConsumerSession.Dispose();
            ConsumerSession = null;
        }

        IsConnected = false;
    }


    public void Publish(string message)
    {
        Producer.Send(ProducerSession.CreateTextMessage(message));
    }


    public void Publish(string[] messages)
    {
        foreach (string msg in messages)
            Publish(msg);
    }

    public void ConnectionExceptionHandler(Exception ex)
    {
        Disconnect(); // Clean up

        if (ConnectionError != null)
            ConnectionError(ex);
    }

    #region IDisposable Members
    private bool disposed;

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    protected virtual void Dispose(bool disposing)
    {
        if (!this.disposed)
        {
            if (disposing)
                Disconnect();

            disposed = true;
        }
    }
    #endregion

}
4

2 に答える 2

8

問題はここにあります->where MQAccess is a small utility class.

質問の最初の部分は、接続がアクティブであるかどうかを確認する方法を尋ねます。WebSphere MQのXMSクラスは、非Javaプラットフォーム用のJMS仕様の実装です。isConnectedこれらはJMS仕様にかなり厳密に従っており、JMS仕様には、 XMSと同等の接続またはセッションのメソッドがありません。ただし、JMS例外をキャッチするには、すべてのGETおよびPUTアクティビティがtry/catchブロック内で発生する必要があります。(ここから常に印刷しますよlinkedExceptionね?)JMS例外がスローされると、アプリはそれを致命的として処理して停止するか、接続ファクトリを除くすべてのJMSオブジェクトを閉じ、数秒待ってから接続を再駆動します。順序。

質問の新しい情報に基づいて更新:
MQAccessクラスを投稿していただきありがとうございます。これにより、何が起こっているのかについてかなりの洞察が得られますが、質問のパート2にあるように、接続が閉じられて再び開かれる場所を示すコードはまだありません。

ただし、コードは、クラスインスタンスが構築されるときに、MQAccessクラスがのプライベートインスタンスを作成しICONNECTION connection、それがとして公開されることを示していMQAccess.GetConnectionます。現在投稿されているMQAccessクラスには、によって保持されている接続ハンドルを置き換えるパブリックまたはプライベートクラスメソッドがないため、が呼び出された場合、クラス内のそのconnectionオブジェクトインスタンスは、無効な接続ハンドルを保持した後も永久に保持されます。接続が閉じられると、そのインスタンスは事実上無効になります。新しい接続を取得するには、削除して再インスタンス化する必要があります。MQAccess.Connection.Close()IConnectionMQAccessMQAccessMQAccess

このMQAccessクラスは接続ファクトリを公開しているため、理論的には、元のオブジェクトを閉じた後でもMQAccess.GetConnection、クラスの外部から呼び出して有効な新しいオブジェクトを取得することができます。IConnectionただし、そのインスタンスはMQAccessクラスのスコープ外に存在するため、以降の呼び出しは、クラスの外部で作成された新しい接続インスタンスではなくMQAccess、無効なインスタンス変数を参照します。connection

接続を閉じて再作成する必要がある場合は、の内部から接続を管理することを検討してくださいMQAccess。ローテクなアプローチはMQAccess.Close()、既存の接続を閉じてすぐに呼び出す接続のメソッドを記述connection = GetConnection();し、プライベートconnection変数が常に有効な接続ハンドルを保持するようにすることです。

これで問題が解決しない場合は、接続を閉じて再作成するコードを投稿してください。

ちなみに、ネットワーク接続を介した非トランザクションセッションは、WMQを含む任意のJMSプロバイダーのメッセージを失ったり複製したりする可能性を開きます。これはあなたが意図したことでしたか?これが他のSO投稿にある理由をここで説明しまし

于 2012-10-19T02:50:56.483 に答える
5

T.Rob からのコメントに追加します。

質問 1:
のソース コードにアクセスできることを願っていますMQAccess。はいの場合MQAccess、接続がアクティブかどうかを示すプロパティを公開できます。アクセスできない場合は、そのクラスの作成者にこのプロパティを追加するよう依頼する必要があります。プロパティを設定/リセットするには、次の操作を実行できます。

1) createConnectionメソッドが正常に戻った後にプロパティを設定します。
2) 接続の例外リスナーを設定します。
3) 例外ハンドラーでプロパティをリセットします。理由コードを確認し、接続切断エラー (XMSWMQ1107 およびリンクされた例外に MQRC 2009 が含まれている可能性がある) である場合は、プロパティをリセットします。

質問 2
あなたの素性closingreopeningつながりを教えていただけると助かります。接続を閉じるための私の推奨事項は次のとおりです
。1)最初にconnection.Stop()を実行します。
2) メッセージ リスナーを削除します。基本的には、consumer.MessageListener = null を実行します。
3) 次に、connection.Close() を実行します。
4) 接続を行う = null

追加情報 これは、私がテストに使用したサンプルです。

    private void OnException(Exception ex)
    {
        XMSException xmsex = (XMSException)ex;
        Console.WriteLine("Got exception");
        // Check the error code.
        if (xmsex.ErrorCode == "XMSWMQ1107")
        {
            Console.WriteLine("This is a connection broken error");
            stopProcessing = true; // This is a class member variable
        }
    }

接続が作成されるメソッドで、例外リスナーを設定します。

        // Create connection.
        connectionWMQ = cf.CreateConnection();
        connectionWMQ.ExceptionListener = new ExceptionListener(OnException);

接続エラーが発生するたびに、例外リスナーが呼び出され、フラグが true に設定されます。

不要になったオブジェクトを破棄することをお勧めします。親子関係があり、Consumer、Producer などは Session の子であり、さらに Connection の子です。したがって、処分の順序は、子が最初で、親が次になる可能性があります。ただし、親が破棄されると、子も自動的に破棄されます。

于 2012-10-19T04:27:33.273 に答える