2

Spring WebSockets、Messaging、RabbitMQ を使用して、リアルタイムのイベントベースのアプリケーションに取り組んでいます。このアプリケーションでは、RabbitMQ に挿入された正確な順序でメッセージをクライアントに配信する必要があります。

「編集済み」

私たちの目標は、ブラウザからメッセージを受信し、サーバー上で順番に処理し (ルート パラメータによって決定される一意のオブジェクトに対して)、メッセージを充実させ、外部 STOMP MQ (RabbitMQ) を介してすべてのサブスクライブ ブラウザにブロードキャストすることです。

MessageMapping メソッドは次のとおりです。

@MessageMapping(/commands.{route}.{data}) public CommandMessage receiveCommand(CommandMessage メッセージ、プリンシパル プリンシパル) {

try {

  // Get object to synch on using route
  Object o = ...

  syncrhonized(o) {

    // Perform command on object

    // Set message server sequence
    message.setServerSequence(o.getAutoIncrementSequence());

    // Log server sequence
    log.debug("Message server sequence:" + message.getServerSequence());

    // Send to external MQ for broadcasting to all subscribers
    return message;
  }

} catch (Exception e) {
  ...
}

return null;

}

CorePoolSize と maxPoolSize をそれぞれ 1 に設定して ClientInboundChannel と ClientOutboundChannels を構成すると、すべてのメッセージが正常に処理されます。

ClientInboundChannel の corePoolSize と maxPoolSize を増やすと、メッセージが誤った順序で MQ に到達します。ClientOutboundChannel に対して同じ増加を行うと、メッセージが間違った順序でブラウザーに到達します。

]

テストは、単一のブラウザー クライアントを使用して行われました。

StompBrokerRelayMessageHandler のトレースを有効にし、次のようなログ エントリを受け取りました。

2014-05-06 14:26:39 TRACE [clientInboundChannel-6] osmssStompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler.java:412] 処理メッセージ = [ペイロード バイト [303]] [ヘッダー = {stompCommand = 送信、nativeHeaders = {コンテンツ タイプ = [ application/json;charset=UTF-8], destination=[/topic/commands.BKN01.20140318]}, simpMessageType=MESSAGE, simpDestination=/topic/commands.BKN01.20140318, contentType=application/json;charset=UTF- 8, simpSessionId=ehjcoxb3, id=a31d0e3d-12cc-f562-1ec2-e2d7ba0899eb, タイムスタンプ=1399400799940}] 2014-05-06 14:26:39 DEBUG [clientInboundChannel-6] osmssStompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler] メッセージを Java 6MessageHandler に転送します。ブローカー 2014-05-06 14:26:39 TRACE [clientInboundChannel-3] osmssStompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler.java:406] destination=/app/commands.BKN01.20140318 へのメッセージを無視.20140318 2014-05-06 14:26:39 TRACE [clientInboundChannel-1] osmssStompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler.java:412] 処理メッセージ = [ペイロード バイト [314]] [ヘッダー = {stompCommand = 送信、nativeHeaders = {コンテンツ タイプ=[アプリケーション/json;文字セット=UTF-8], 宛先=[/トピック/commands.BKN01.20140318]}, simpMessageType=MESSAGE, simpDestination=/topic/commands.BKN01.20140318, contentType=アプリケーション/json;文字セット= UTF-8、simpSessionId=ehjcoxb3、id=3cc7b4ae-8ea4-ef8a-6c4d-c3bc1ed23bcd、タイムスタンプ=1399400799947}] 2014-05-06 14:26:39 DEBUG [clientInboundChannel-1] osmssStompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler.java:658] メッセージをブローカーに転送しています

また、org.springframework.web.socket.messaging パッケージで StompSubProtocolHandler のトレースをオンにし、次のようなメッセージを受け取りました。

2014-05-07 10:58:58 TRACE [http-nio-8080-exec-5] oswsmStompSubProtocolHandler [StompSubProtocolHandler.java:180] クライアントからメッセージを受け取りました session=u8wrnsr6

どの情報も、message.serverSequence プロパティ (MQ に送信する前に設定される) をログの詳細のさまざまな ID にマップする簡単な方法を提供しません。

インバウンド/アウトバウンド チャネル スレッドを増やして順序を維持する方法はありますか? たとえば、チャネルを「ルート」に結び付けたり、スレッドを「ルート」に固定したりできますか?

助けてください。

ありがとうございました、

ダン

4

1 に答える 1

2

[編集]

ありがとう、わかりました。実際、現時点では、外部ブローカーからのメッセージがまったく同じ順序でクライアントに配信され、クライアントからのメッセージがまったく同じ順序で外部ブローカーに送信されることを保証する方法はありません。

セッション内のすべてのメッセージにインデックスを割り当て、順序を強制するために必要に応じてチェックしてバッファリングすることもできますが、これは新しい機能です。JIRA で自由にリクエストを作成してください。

今のところ、すべてのメッセージをチェックしてどのセッションに属しているかを確認する ChannelInterceptor の作成を検討し、それを送信する前に (clientInboundChannel または clientOutboundChannel で) インクリメンタル インデックス ヘッダーを設定することができます。次に、StompSubProtocolHandler と StompBrokerRelayMessageHandler を拡張して、インデックス ヘッダーをチェックし、メッセージの順序を強制しようとします。

于 2014-05-06T13:42:53.773 に答える