1

Spring Integration、Websocket、RabbitMQ サーバーを使用してデモ チャット アプリケーションを開発しています。単一サーバーでアプリケーションを実行すると、正常に動作します。

プロデューサーによって送信されたすべてのメッセージは、コンシューマーによって受信されます。しかし、クラスター化された環境で実行すると、メッセージはサーバーでランダムに受信され、すべてのサーバーで受信されるわけではありません。

自分のコードに問題があるのか​​、構成が原因なのかはわかりません。

ロガーで確認してみました。ロガーは、メッセージが正常に送信されたが、すべてのサーバーで受信されたわけではなく、1 つのサーバーのみで受信されたことを示しています。

以下は、構成とともに使用しているクラスです。

ChatController.java

@Controller
public class ChatController {

private Logger logger = LoggerFactory.getLogger(getClass());

@RequestMapping(value = "/", method = RequestMethod.GET)
public String viewApplication() {
    return "index";
}
@Autowired
private AmqpTemplate reviewTemplate;

@MessageMapping(value = "/random")
public void sendDataUpdates(OutputMessage message) {
    try {
        System.out.println("<<<<<<< Sending Message <<<<<<<<<<" + message.getMessage() + "   ID : " + message.getId());
        sendMessages(message);
    } catch (Exception ex) {
        System.out.println("Exception ------>>>>>> " + ex);
    }
}

private void sendMessages(OutputMessage msg) {
    reviewTemplate.convertAndSend(msg);
}

}

RandomDataGenerator.java

@Component
public class RandomDataGenerator implements
    ApplicationListener<BrokerAvailabilityEvent> {

private final MessageSendingOperations<String> messagingTemplate;

@Autowired
public RandomDataGenerator(
        final MessageSendingOperations<String> messagingTemplate) {
    this.messagingTemplate = messagingTemplate;
}

@Override
public void onApplicationEvent(final BrokerAvailabilityEvent event) {
}

public void onMessage(GenericMessage<?> msg) {
    try {
        System.out.println("Message ====== >>>>> " + msg);
        OutputMessage message = (OutputMessage) msg.getPayload();
        this.messagingTemplate.convertAndSend(
                "/data", message);    

        System.out.println("Message ====== >>>>> " + message.getMessage());           
    } catch (Exception ex) {
        System.out.println("==================== " + ex);
    }
    finally {
    }
}    

}

webapp-config.xml

<rabbit:annotation-driven />

<rabbit:admin id="rabbitAdmin" connection-factory="rabbitConnectionFactory" auto-startup="true" />

<rabbit:connection-factory id="rabbitConnectionFactory" 
                           connection-timeout="5000" publisher-returns="true"
                           channel-cache-size="32" cache-mode="CHANNEL"
                           host="localhost" username="guest" password="guest" port="5672" 
                           publisher-confirms="true" requested-heartbeat="5000" />

<rabbit:fanout-exchange name="reviewExchange" id="reviewExchange" durable="true">
    <rabbit:bindings>
        <rabbit:binding queue="reviewQueue"></rabbit:binding>
    </rabbit:bindings>        
</rabbit:fanout-exchange>

<rabbit:direct-exchange name="directExchange" id="directExchange" durable="true" />


<rabbit:template id="reviewTemplate" connection-factory="rabbitConnectionFactory"
                 encoding="UTF-8" exchange="reviewExchange" queue="reviewQueue"       
                 routing-key="reviewKey" />

<rabbit:queue id="reviewQueue" name="reviewQueue" durable="true" />    

<bean id="customMessageListener" class="de.kimrudolph.tutorials.utils.RandomDataGenerator" />

<int:publish-subscribe-channel id="reviewPubSubChannel" />

<amqp:outbound-channel-adapter channel="reviewPubSubChannel"
                               amqp-template="reviewTemplate" exchange-name="reviewExchange"/>    

<int:channel id="reviewInboundChannel" /> 

<amqp:inbound-channel-adapter channel="reviewInboundChannel" queue-names="reviewQueue" connection-factory="rabbitConnectionFactory" />

<int:service-activator input-channel="reviewInboundChannel" id="reviewQueueServiceActivator" ref="customMessageListener" method="onMessage" />


<websocket:message-broker application-destination-prefix="/app">
    <websocket:stomp-endpoint path="/random">
        <websocket:sockjs />
    </websocket:stomp-endpoint>
    <websocket:simple-broker prefix="/data" />
    <websocket:client-inbound-channel>
        <websocket:executor core-pool-size="200" keep-alive-seconds="300" max-pool-size="1000" queue-capacity="5000" />
    </websocket:client-inbound-channel>
    <websocket:client-outbound-channel>
        <websocket:executor core-pool-size="200" keep-alive-seconds="300" max-pool-size="1000" queue-capacity="5000" />
    </websocket:client-outbound-channel>
</websocket:message-broker>

プロキシ サーバーの構成

worker.list=loadbalancer,status  
 worker.tomcat1.port=8003  
 worker.tomcat1.host=localhost  
 worker.tomcat1.type=ajp13  

 worker.tomcat2.port=8008  
 worker.tomcat2.host=localhost  
 worker.tomcat2.type=ajp13  

 worker.tomcat3.port=8013  
 worker.tomcat3.host=localhost  
 worker.tomcat3.type=ajp13  

 worker.tomcat1.lbfactor=1  
 worker.tomcat2.lbfactor=1  
 worker.tomcat3.lbfactor=1 

 worker.loadbalancer.type=lb  
 worker.loadbalancer.balance_workers=tomcat1,tomcat2,tomcat3
 worker.loadbalancer.sticky_session=1

 worker.status.type=status 


JkWorkersFile conf/workers.properties
JkLogFile logs/mod_jk.log 
JkLogLevel error 
JkMount /spring-mvc-websockets-master loadbalancer 
JkMount /spring-mvc-websockets-master/* loadbalancer
JkMount /SpringChatExample loadbalancer 
JkMount /SpringChatExample/* loadbalancer

以下は、問題の原因をテストして試すことができるサンプル アプリケーションのリンクです。

デモ申し込み

4

1 に答える 1

1

正解です。キューからメッセージを受信できるのは 1 つのコンシューマーだけだからです。

すべての受信側アプリケーションが同じように構成されているqueueため、Broker には 1 つしかありません。bindingfanout-exchange

それを達成するには、定義AnonymousQueueに を指定するだけで、 を使用できます。この場合、所有しているクラスター メンバーと同じ数のバインディングが作成されます。id<rabbit:queue>fanout-exchange

AnonymousQueueauto-delete利点があります。これは、クラスター メンバーがキューを停止し、そのバインドが削除されるときを意味します。この場合、SpEL を使用して構成する必要がありますqueue-names

またはランダムqueue nameを生成して使用しますauto-delete="true"

<bean id="inetAddress" class="java.net.InetAddress" factory-method="getLocalHost"/>

<rabbit:queue id="settingsReplyQueue" name="#inetAddress.toString()}"
       auto-delete="true"/>

ここでも同じ SpEL フックがqueue-names.

于 2015-03-16T16:11:54.343 に答える