1

MQ キューからメッセージを取得して処理するように設計された websphere ベースのコードがあります。ほとんどの場合、コードは正常に動作しますが、数日ごとにコードは実行され続けますが、メッセージがまだキューにあるにもかかわらず、メッセージの取得が停止します。プロセスを元に戻すには、アプリケーションをバンプする必要があり、その後、すべてが正常に機能し始めます。

メッセージは非常に大きくなる可能性があります (メッセージあたり最大 4MB)。私は WAS 7 で実行しています。

エラーを表していると思われるメッセージや例外は表示されません。

これは、人々がコメントするためのコードです

public class BlipMqProcessor {

    protected static final int ONE_SECOND = 1000;
    protected static final int ONE_HOUR = 60 * 60 * ONE_SECOND;
    protected static final int MQ_READ_TIMEOUT = Integer.parseInt(Constants.MQ_READ_TIMEOUT_IN_SECONDS) * ONE_SECOND;

    protected static int previousMqReasonCode;
    protected static long previousMqErrorTime;

    private BlipXmlProcessor xmlProcessor;

    // Member variables for MQ processing
    protected MQQueueManager qMgr;
    protected MQQueue queue;
    protected MQGetMessageOptions gmo;

    /**
     * Constructs a new BlipMqProcessor with the given values.
     */
    public BlipMqProcessor() {
        this(new BlipXmlProcessor());
    }

    /**
     * Constructs a new BlipMqProcessor with the given values.
     * @param xmlProcessor the processor that will be used to create the
     *      staging table entries.
     */
    public BlipMqProcessor(final BlipXmlProcessor xmlProcessor) {
        super();
        this.xmlProcessor = xmlProcessor;
    }

    /**
     * Reads XML messages from the Constants.MQ_ACCESS_QUEUE_NAME
     * 
     * @throws BlipModelException if there are any 
     */
    public void readFromMQ() throws BlipModelException {
        try {
            createNewConnectionToMQ();
            while(true) {
                MQMessage outputMessage = new MQMessage();
                queue.get(outputMessage,gmo);
                String blipModelXml = outputMessage.readLine();
                BlipLogs.logXML("BlipREQ", "0", blipModelXml);
                processMessage(blipModelXml);
                qMgr.commit();
            }
        } catch (final MQException e) {
            if (e.reasonCode != MQException.MQRC_NO_MSG_AVAILABLE) {
                handleMqException(e);
            }
        } catch (final IOException e) {
            throw new BlipModelException("MQ", "Error reading MQ message.", "BlipMqProcessor.readFromMQ", e);
        } finally {
            cleanupMQResources();
        }
    }


    /**
     * Clean up MQ resources.
     */
    private void cleanupMQResources() {
        // Close queue
        if(queue != null) {
           try {
              queue.close();
           }catch(final MQException e) {
               BlipModelLogger.error("MQ", "BlipMqProcessor", "Problem closing queue: " + e);
           }
        }
        // Disconnect queue manager
        if(qMgr != null) {
            try {
                qMgr.disconnect();
            } catch (final MQException e) {
                BlipModelLogger.error("MQ", "BlipMqProcessor", "Problem disconnecting from qMgr: " + e);
            }
        }
    }

    protected void createNewConnectionToMQ() throws MQException {
        try {
            MQEnvironment.hostname = Constants.MQ_HOST;
            MQEnvironment.channel = Constants.MQ_CHANNEL;
            MQEnvironment.port      = Integer.parseInt(Constants.MQ_PORT);
            if(Constants.MQ_SSL_CIPHER_SUITE != null) {
                MQEnvironment.sslCipherSuite = Constants.MQ_SSL_CIPHER_SUITE;
                MQEnvironment.sslPeerName = Constants.MQ_SSL_PEER;
            } else {
                MQEnvironment.sslCipherSuite = "";
                MQEnvironment.sslPeerName = "";
            }

            qMgr = new MQQueueManager(Constants.MQ_QMGR);
            int openOptions = MQC.MQOO_INPUT_AS_Q_DEF;
            queue = qMgr.accessQueue(Constants.MQ_IN_ACCESS_QUEUE, openOptions);
            gmo = new MQGetMessageOptions();
            gmo.options = MQC.MQGMO_WAIT | MQC.MQGMO_SYNCPOINT | MQC.MQGMO_FAIL_IF_QUIESCING;
            gmo.waitInterval = MQ_READ_TIMEOUT;
        } finally {
            MQEnvironment.sslCipherSuite = "";
            MQEnvironment.sslPeerName = "";
        }
    }

    protected void handleMqException(final MQException e) {
        long currentTime = System.currentTimeMillis();
        long timeBetweenMqErrors = currentTime - previousMqErrorTime;
        if (previousMqReasonCode != e.reasonCode || timeBetweenMqErrors > ONE_HOUR) {
            previousMqReasonCode = e.reasonCode;
            previousMqErrorTime = currentTime;
            BlipModelLogger.error("MQ", "BlipMqProcessor", "MQException reading from Access Queue: " + e);
        }
    }


}
4

1 に答える 1

0

readFromMQ メソッドを変更します。

public void readFromMQ() throws BlipModelException {
    try {
        createNewConnectionToMQ();
        while(true) {
          try {
            MQMessage outputMessage = new MQMessage();
            queue.get(outputMessage,gmo);
            String blipModelXml = outputMessage.readLine();
            BlipLogs.logXML("BlipREQ", "0", blipModelXml);
            processMessage(blipModelXml);
            qMgr.commit();
          } catch (MQException e) {
            if (e.reasonCode != MQException.MQRC_NO_MSG_AVAILABLE) {
              throw e;
            }
          }
        }
    } catch (final MQException e) {
      handleMqException(e);
    } catch (final IOException e) {
        throw new BlipModelException("MQ", "Error reading MQ message.", "BlipMqProcessor.readFromMQ", e);
    } finally {
        cleanupMQResources();
    }
}

迅速な解決策になります。しかしエレガントではありません。ここには、リファクタリングの余地がたくさんあります。

何が起こっているかというと、実際には MQRC_NO_MSG_AVAILABLE を受け取っているということです。これは、おかしなことに、取得できる別のメッセージがないためです (考えているときではなく、ある時点で)。その例外を無視することを決定した時点で、すでに while(true) ループから抜け出しています。

queue.getCurrentQueueDepth()は (明らかに) パフォーマンスが低く、エイリアス キューまたはクラスター上に存在するキューでは機能しないため、使用できません。これで終わりです。それは吸う。

于 2012-06-29T19:31:40.247 に答える