MQ キューからメッセージを取得して処理するように設計された websphere ベースのコードがあります。ほとんどの場合、コードは正常に動作しますが、数日ごとにコードは実行され続けますが、メッセージがまだキューにあるにもかかわらず、メッセージの取得が停止します。プロセスを元に戻すには、アプリケーションをバンプする必要があり、その後、すべてが正常に機能し始めます。
メッセージは非常に大きくなる可能性があります (メッセージあたり最大 4MB)。私は WAS 7 で実行しています。
エラーを表していると思われるメッセージや例外は表示されません。
これは、人々がコメントするためのコードです
public class BlipMqProcessor {
protected static final int ONE_SECOND = 1000;
protected static final int ONE_HOUR = 60 * 60 * ONE_SECOND;
protected static final int MQ_READ_TIMEOUT = Integer.parseInt(Constants.MQ_READ_TIMEOUT_IN_SECONDS) * ONE_SECOND;
protected static int previousMqReasonCode;
protected static long previousMqErrorTime;
private BlipXmlProcessor xmlProcessor;
// Member variables for MQ processing
protected MQQueueManager qMgr;
protected MQQueue queue;
protected MQGetMessageOptions gmo;
/**
* Constructs a new BlipMqProcessor with the given values.
*/
public BlipMqProcessor() {
this(new BlipXmlProcessor());
}
/**
* Constructs a new BlipMqProcessor with the given values.
* @param xmlProcessor the processor that will be used to create the
* staging table entries.
*/
public BlipMqProcessor(final BlipXmlProcessor xmlProcessor) {
super();
this.xmlProcessor = xmlProcessor;
}
/**
* Reads XML messages from the Constants.MQ_ACCESS_QUEUE_NAME
*
* @throws BlipModelException if there are any
*/
public void readFromMQ() throws BlipModelException {
try {
createNewConnectionToMQ();
while(true) {
MQMessage outputMessage = new MQMessage();
queue.get(outputMessage,gmo);
String blipModelXml = outputMessage.readLine();
BlipLogs.logXML("BlipREQ", "0", blipModelXml);
processMessage(blipModelXml);
qMgr.commit();
}
} catch (final MQException e) {
if (e.reasonCode != MQException.MQRC_NO_MSG_AVAILABLE) {
handleMqException(e);
}
} catch (final IOException e) {
throw new BlipModelException("MQ", "Error reading MQ message.", "BlipMqProcessor.readFromMQ", e);
} finally {
cleanupMQResources();
}
}
/**
* Clean up MQ resources.
*/
private void cleanupMQResources() {
// Close queue
if(queue != null) {
try {
queue.close();
}catch(final MQException e) {
BlipModelLogger.error("MQ", "BlipMqProcessor", "Problem closing queue: " + e);
}
}
// Disconnect queue manager
if(qMgr != null) {
try {
qMgr.disconnect();
} catch (final MQException e) {
BlipModelLogger.error("MQ", "BlipMqProcessor", "Problem disconnecting from qMgr: " + e);
}
}
}
protected void createNewConnectionToMQ() throws MQException {
try {
MQEnvironment.hostname = Constants.MQ_HOST;
MQEnvironment.channel = Constants.MQ_CHANNEL;
MQEnvironment.port = Integer.parseInt(Constants.MQ_PORT);
if(Constants.MQ_SSL_CIPHER_SUITE != null) {
MQEnvironment.sslCipherSuite = Constants.MQ_SSL_CIPHER_SUITE;
MQEnvironment.sslPeerName = Constants.MQ_SSL_PEER;
} else {
MQEnvironment.sslCipherSuite = "";
MQEnvironment.sslPeerName = "";
}
qMgr = new MQQueueManager(Constants.MQ_QMGR);
int openOptions = MQC.MQOO_INPUT_AS_Q_DEF;
queue = qMgr.accessQueue(Constants.MQ_IN_ACCESS_QUEUE, openOptions);
gmo = new MQGetMessageOptions();
gmo.options = MQC.MQGMO_WAIT | MQC.MQGMO_SYNCPOINT | MQC.MQGMO_FAIL_IF_QUIESCING;
gmo.waitInterval = MQ_READ_TIMEOUT;
} finally {
MQEnvironment.sslCipherSuite = "";
MQEnvironment.sslPeerName = "";
}
}
protected void handleMqException(final MQException e) {
long currentTime = System.currentTimeMillis();
long timeBetweenMqErrors = currentTime - previousMqErrorTime;
if (previousMqReasonCode != e.reasonCode || timeBetweenMqErrors > ONE_HOUR) {
previousMqReasonCode = e.reasonCode;
previousMqErrorTime = currentTime;
BlipModelLogger.error("MQ", "BlipMqProcessor", "MQException reading from Access Queue: " + e);
}
}
}