Spring WebSockets、Messaging、RabbitMQ を使用して、リアルタイムのイベントベースのアプリケーションに取り組んでいます。このアプリケーションでは、RabbitMQ に挿入された正確な順序でメッセージをクライアントに配信する必要があります。
「編集済み」
私たちの目標は、ブラウザからメッセージを受信し、サーバー上で順番に処理し (ルート パラメータによって決定される一意のオブジェクトに対して)、メッセージを充実させ、外部 STOMP MQ (RabbitMQ) を介してすべてのサブスクライブ ブラウザにブロードキャストすることです。
MessageMapping メソッドは次のとおりです。
@MessageMapping(/commands.{route}.{data}) public CommandMessage receiveCommand(CommandMessage メッセージ、プリンシパル プリンシパル) {
try {
// Get object to synch on using route
Object o = ...
syncrhonized(o) {
// Perform command on object
// Set message server sequence
message.setServerSequence(o.getAutoIncrementSequence());
// Log server sequence
log.debug("Message server sequence:" + message.getServerSequence());
// Send to external MQ for broadcasting to all subscribers
return message;
}
} catch (Exception e) {
...
}
return null;
}
CorePoolSize と maxPoolSize をそれぞれ 1 に設定して ClientInboundChannel と ClientOutboundChannels を構成すると、すべてのメッセージが正常に処理されます。
ClientInboundChannel の corePoolSize と maxPoolSize を増やすと、メッセージが誤った順序で MQ に到達します。ClientOutboundChannel に対して同じ増加を行うと、メッセージが間違った順序でブラウザーに到達します。
]
テストは、単一のブラウザー クライアントを使用して行われました。
StompBrokerRelayMessageHandler のトレースを有効にし、次のようなログ エントリを受け取りました。
2014-05-06 14:26:39 TRACE [clientInboundChannel-6] osmssStompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler.java:412] 処理メッセージ = [ペイロード バイト [303]] [ヘッダー = {stompCommand = 送信、nativeHeaders = {コンテンツ タイプ = [ application/json;charset=UTF-8], destination=[/topic/commands.BKN01.20140318]}, simpMessageType=MESSAGE, simpDestination=/topic/commands.BKN01.20140318, contentType=application/json;charset=UTF- 8, simpSessionId=ehjcoxb3, id=a31d0e3d-12cc-f562-1ec2-e2d7ba0899eb, タイムスタンプ=1399400799940}] 2014-05-06 14:26:39 DEBUG [clientInboundChannel-6] osmssStompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler] メッセージを Java 6MessageHandler に転送します。ブローカー 2014-05-06 14:26:39 TRACE [clientInboundChannel-3] osmssStompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler.java:406] destination=/app/commands.BKN01.20140318 へのメッセージを無視.20140318 2014-05-06 14:26:39 TRACE [clientInboundChannel-1] osmssStompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler.java:412] 処理メッセージ = [ペイロード バイト [314]] [ヘッダー = {stompCommand = 送信、nativeHeaders = {コンテンツ タイプ=[アプリケーション/json;文字セット=UTF-8], 宛先=[/トピック/commands.BKN01.20140318]}, simpMessageType=MESSAGE, simpDestination=/topic/commands.BKN01.20140318, contentType=アプリケーション/json;文字セット= UTF-8、simpSessionId=ehjcoxb3、id=3cc7b4ae-8ea4-ef8a-6c4d-c3bc1ed23bcd、タイムスタンプ=1399400799947}] 2014-05-06 14:26:39 DEBUG [clientInboundChannel-1] osmssStompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler.java:658] メッセージをブローカーに転送しています
また、org.springframework.web.socket.messaging パッケージで StompSubProtocolHandler のトレースをオンにし、次のようなメッセージを受け取りました。
2014-05-07 10:58:58 TRACE [http-nio-8080-exec-5] oswsmStompSubProtocolHandler [StompSubProtocolHandler.java:180] クライアントからメッセージを受け取りました session=u8wrnsr6
どの情報も、message.serverSequence プロパティ (MQ に送信する前に設定される) をログの詳細のさまざまな ID にマップする簡単な方法を提供しません。
インバウンド/アウトバウンド チャネル スレッドを増やして順序を維持する方法はありますか? たとえば、チャネルを「ルート」に結び付けたり、スレッドを「ルート」に固定したりできますか?
助けてください。
ありがとうございました、
ダン