3

最近、Spring Cloud Stream と RabbitMQ バインダーをいじり始めました。

2 つのサービスがメッセージを渡したいときにすべてを正しく理解していれば、一方はメッセージを送信するようにソースを構成し、もう一方はメッセージを受信するようにシンクを構成する必要があります。両方が同じチャネルを使用する必要があります。

という名前のチャンネルtestchannelがあります。ただし、ソースが RabbitMQ バインディングを作成したことに気付きました。

  • 交換testchannel
  • ルーティングキーtestchannel,
  • キューtestchannel.default(持続可能)、

シンクがRabbitMQバインディングを作成している間:

  • 交換testchannel
  • ルーティングキー#,
  • キューtestchannel.anonymous.RANDOM_ID(排他的)。

簡潔にするために、接頭辞をスキップしました。

両方のアプリケーションを実行したとき。最初に交換にメッセージを送信しtestchannel、それが両方のキューにルーティングされます (ルーティング キーは であると仮定しますtestchannel)。2 番目のアプリケーションはランダム キューからメッセージを消費しますが、デフォルト キューからのメッセージは消費されません。

私の他の問題は - 2 番目のアプリはsinkoutputのみを使用していますが、何も指定していないため、デフォルトである出力チャネルのバインディングも作成します。

同じ Gradle スクリプトを使用して両方のアプリをビルドします。

buildscript {
    ext {
        springBootVersion = '1.3.2.RELEASE'
    }
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

apply plugin: 'java'
apply plugin: 'spring-boot'

repositories {
    mavenCentral()
    maven { url 'https://repo.spring.io/snapshot' }
    maven { url 'https://repo.spring.io/milestone' }
}

dependencies {
    compile(
            'org.springframework.cloud:spring-cloud-starter-stream-rabbit',
    )
}

dependencyManagement {
    imports {
        mavenBom "org.springframework.cloud:spring-cloud-dependencies:Brixton.BUILD-SNAPSHOT"
    }
}

最初のアプリ プロパティ:

server.port=8010
spring.cloud.stream.binder.rabbit.default.prefix=z.
spring.cloud.stream.bindings.input=start
spring.cloud.stream.bindings.output=testchannel
spring.rabbitmq.addresses=host1:5672,host2:5672
spring.rabbitmq.username=user
spring.rabbitmq.password=psw

最初のアプリのソース コード:

@EnableBinding(Processor.class)
...
@ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public byte[] handleIncomingMessage(byte[] payload) {}

2 番目のアプリのプロパティ:

server.port=8011
spring.cloud.stream.binder.rabbit.default.prefix=z.
spring.cloud.stream.bindings.input=testchannel
spring.rabbitmq.addresses=host1:5672,host2:5672
spring.rabbitmq.username=user
spring.rabbitmq.password=psw

2 番目のアプリのソース コード:

@EnableBinding(Sink.class)
...
@ServiceActivator(inputChannel = Sink.INPUT)
public void handleIncomingMessage(byte[] payload) {}

だから私の質問はです。

  • ソースシンクが同じチャネルを使用して、結果として同じブローカー キューを使用するべきではありませんか? それを達成するための適切な構成は何ですか?(私の目標は複数のシンクサービス インスタンスを持つことですが、メッセージを消費するのは 1 つだけです。)
  • シンクのみを使用している場合、フレームワークは出力バインディングを作成する必要がありますか? はいの場合、それを無効にする方法。
4

1 に答える 1

2

デフォルトでは; コンシューマはそれぞれ独自のキューを取得します。これはパブリッシュ/サブスクライブのシナリオです。

コンシューマーの概念があるgroupため、複数のインスタンスで同じキューからのメッセージを競合させることができます。

プロデューサをバインドすると、デフォルトのキューがバインドされます。

グループへの参加を希望する場合default; グループを設定する必要があります:

spring.cloud.stream.bindings.input.group=default

グループを指定しない場合は、排他的な自動削除キューが作成されます。

編集

デフォルトのキューは永続的であるため、設定する必要もあります

spring.cloud.stream.bindings.input.durableSubscription=true

コンシューマーがバインドするときに警告を回避し、コンシューマーが最初にバインドされ、キューがまだ存在しない場合にキューが永続的であることを確認します。

于 2016-02-05T19:44:07.437 に答える