最近、Spring Cloud Stream と RabbitMQ バインダーをいじり始めました。
2 つのサービスがメッセージを渡したいときにすべてを正しく理解していれば、一方はメッセージを送信するようにソースを構成し、もう一方はメッセージを受信するようにシンクを構成する必要があります。両方が同じチャネルを使用する必要があります。
という名前のチャンネルtestchannel
があります。ただし、ソースが RabbitMQ バインディングを作成したことに気付きました。
- 交換
testchannel
、 - ルーティングキー
testchannel
, - キュー
testchannel.default
(持続可能)、
シンクがRabbitMQバインディングを作成している間:
- 交換
testchannel
、 - ルーティングキー
#
, - キュー
testchannel.anonymous.RANDOM_ID
(排他的)。
簡潔にするために、接頭辞をスキップしました。
両方のアプリケーションを実行したとき。最初に交換にメッセージを送信しtestchannel
、それが両方のキューにルーティングされます (ルーティング キーは であると仮定しますtestchannel
)。2 番目のアプリケーションはランダム キューからメッセージを消費しますが、デフォルト キューからのメッセージは消費されません。
私の他の問題は - 2 番目のアプリはsinkoutput
のみを使用していますが、何も指定していないため、デフォルトである出力チャネルのバインディングも作成します。
同じ Gradle スクリプトを使用して両方のアプリをビルドします。
buildscript {
ext {
springBootVersion = '1.3.2.RELEASE'
}
repositories {
mavenCentral()
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}
apply plugin: 'java'
apply plugin: 'spring-boot'
repositories {
mavenCentral()
maven { url 'https://repo.spring.io/snapshot' }
maven { url 'https://repo.spring.io/milestone' }
}
dependencies {
compile(
'org.springframework.cloud:spring-cloud-starter-stream-rabbit',
)
}
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:Brixton.BUILD-SNAPSHOT"
}
}
最初のアプリ プロパティ:
server.port=8010
spring.cloud.stream.binder.rabbit.default.prefix=z.
spring.cloud.stream.bindings.input=start
spring.cloud.stream.bindings.output=testchannel
spring.rabbitmq.addresses=host1:5672,host2:5672
spring.rabbitmq.username=user
spring.rabbitmq.password=psw
最初のアプリのソース コード:
@EnableBinding(Processor.class)
...
@ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public byte[] handleIncomingMessage(byte[] payload) {}
2 番目のアプリのプロパティ:
server.port=8011
spring.cloud.stream.binder.rabbit.default.prefix=z.
spring.cloud.stream.bindings.input=testchannel
spring.rabbitmq.addresses=host1:5672,host2:5672
spring.rabbitmq.username=user
spring.rabbitmq.password=psw
2 番目のアプリのソース コード:
@EnableBinding(Sink.class)
...
@ServiceActivator(inputChannel = Sink.INPUT)
public void handleIncomingMessage(byte[] payload) {}
だから私の質問はです。
- ソースとシンクが同じチャネルを使用して、結果として同じブローカー キューを使用するべきではありませんか? それを達成するための適切な構成は何ですか?(私の目標は複数のシンクサービス インスタンスを持つことですが、メッセージを消費するのは 1 つだけです。)
- シンクのみを使用している場合、フレームワークは出力バインディングを作成する必要がありますか? はいの場合、それを無効にする方法。