スタンドアロンで実行されている HornetQ (v2.2.13) コンシューマーを使用して、JBOSS サーバー (7.1.1 final) によって発行された永続トピックを読み取ります。すべてが数時間 (2 ~ 6 時間) うまくいき、コンシューマーはトピックからのメッセージの受信を停止します。サーバー上のログ ファイルから、データがパイプに送り込まれ続けていることがわかりますが、コンシューマー ログ ファイルは、クライアントがデータの読み取りを停止したことを示しています。クライアントが最後にトピックからメッセージを読み取ったのは 12:00:00 であり、サーバー ログが最後にトピックにメッセージをプッシュしたのは 14:00:00 であると述べていることから推測しました。
HornetQ 構成を調整してみましたが、持続的に機能していないようです。
トピックと通信するために使用するコードは次のとおりです。
private TransportConfiguration getTC(String hostname) {
Map<String,Object> params = new HashMap<String, Object>();
params.put(TransportConstants.HOST_PROP_NAME, hostname);
params.put(TransportConstants.PORT_PROP_NAME, 5445);
TransportConfiguration tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
return tc;
}
private Topic createDestination(String destinationName) {
Topic topic = new HornetQTopic(destinationName);
return topic;
}
private HornetQConnectionFactory createCF(TransportConfiguration tc) {
HornetQConnectionFactory cf = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType .CF, tc);
return cf == null ? null : cf;
}
セッションを作成して開始するコード スニペット:
TransportConfiguration tc = this.getTC(this.hostname);
HornetQConnectionFactory cf = this.createCF(tc);
cf.setRetryInterval(4000);
cf.setReconnectAttempts(10);
cf.setConfirmationWindowSize(1000000);
Destination destination = this.createDestination(this.topicName);
logger.info("Starting Topic Connection");
try {
this.connection = cf.createConnection();
connection.start();
this.session = connection.createSession(transactional, ackMode);
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(this);
logger.info("Started topic connection");
} catch (Exception ex) {
ex.printStackTrace();
logger.error("EXCEPTION!");
}