1 つのコンシューマーを使用して hornetQ でセッションを作成し、プロデューサーを使用してキューに 4 つのメッセージを追加しました。この後、新しいコンシューマーを作成しました。
この消費者は古いメッセージについて知っていますか?
いいえの場合、XML で構成できますか?
以前のメッセージを取得できなかった新しいコンシューマーを作成しました。この動作が正しいかどうかを確認したかっただけですか? ドキュメントにヘルプが見つかりませんでした。
以下はコード スニペットです。
TextMessage receivedMessage = (TextMessage)consumer.receive();
receivedMessage.acknowledge();
System.out.println("Got order: " + receivedMessage.getText());
//consumer.close();
MessageConsumer newConsumer = session.createConsumer(orderQueue);
receivedMessage = (TextMessage)newConsumer.receive();
receivedMessage.acknowledge();
System.out.println("Got order: " + receivedMessage.getText());
consumer.close() 行のコメントを外すと、正常に動作します。
私の hornetq-jms.xml
<connection-factory name="NettyConnectionFactory">
<xa>true</xa>
<connectors>
<connector-ref connector-name="netty"/>
</connectors>
<entries>
<entry name="/XAConnectionFactory"/>
</entries>
<consumer-window-size>0</consumer-window-size>
</connection-factory>
<connection-factory name="NettyConnectionFactory">
<xa>false</xa>
<connectors>
<connector-ref connector-name="netty"/>
</connectors>
<entries>
<entry name="/ConnectionFactory"/>
</entries>
<consumer-window-size>0</consumer-window-size>
</connection-factory>
<connection-factory name="NettyThroughputConnectionFactory">
<xa>true</xa>
<connectors>
<connector-ref connector-name="netty-throughput"/>
</connectors>
<entries>
<entry name="/XAThroughputConnectionFactory"/>
</entries>
<consumer-window-size>0</consumer-window-size>
</connection-factory>
<connection-factory name="NettyThroughputConnectionFactory">
<xa>false</xa>
<connectors>
<connector-ref connector-name="netty-throughput"/>
</connectors>
<entries>
<entry name="/ThroughputConnectionFactory"/>
</entries>
<consumer-window-size>0</consumer-window-size>
</connection-factory>
接続ファクトリーのコード・スニペット
TransportConfiguration transportConfiguration = new
TransportConfiguration(NettyConnectorFactory.class.getName());
HornetQConnectionFactory cf =
HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF,getTransportConfiguration());
Queue orderQueue = HornetQJMSClient.createQueue("MutationPipelineQueue");
getTransportConfiguration() のコード:
private synchronized static TransportConfiguration getTransportConfiguration() {
HashMap<String, TransportConfiguration> transportConfigurationMap = new HashMap<String, TransportConfiguration>();
TransportConfiguration tc = transportConfigurationMap.get("machinename:5455");
if(tc == null){
Map<String, Object> connectionParams = new HashMap<String, Object>();
connectionParams.put(org.hornetq.core.remoting.impl.netty.TransportConstants.HOST_PROP_NAME,"machinename");
connectionParams.put(org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME,Integer.valueOf("5455"));
tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), connectionParams);
transportConfigurationMap.put("machinename:5455", tc);
}
return tc;