8

次のように、Kafka Streams DSL を使用して Kafka クラスターに接続する Kafka Streams アプリがあります。

KStreamBuilder builder = new KStreamBuilder();
KStream<String, byte[]> stream = builder.stream(myTopic);

// do work

kStreams = new KafkaStreams(builder, config);
kStreams.start();

そして、コンシューマー クライアントを直接使用してクラスターへの接続を確立するコード ベースの別の部分。

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config, keyDeserializer, valueDeserializer);
consumer.subscribe(Collections.singletonList(sourceTopic));
consumer.poll(500L);
// etc
consumer.close();

これを行っている理由は、アプリの他の部分 (Kafka Streams トポロジーを含む) を条件付きで開始する前に、コンシューマー グループに関するメタ データを収集するためです。InconsistentGroupProtocolExceptionこれを行う方法はおそらく他にもありますが (たとえば、さまざまなフックを使用するなど)、これらのメソッドを混在させると (断続的に)スローされることがある理由についてもっと知りたいです。

なぜこれがスローされているのか、誰かが光を当てることができますか? ソースコード自体から正確に何が起こっているのかを判断するのに苦労していますが、Kafka Streams によって構築された基礎となるコンシューマーは、クライアントとは異なるパーティショニング プロトコルを指定していると思いKafkaConsumerます。とにかく、この例外を理解するための助けは大歓迎です

4

1 に答える 1

14

あなたは自分で答えを出します。Kafka Streams はカスタム パーティション アサイナーを使用し、Kafka Streams クライアントは他の Kafka Streams クライアントとのみ連携します。KafkaConsumerKafka Streams アプリと同じグループ ID を持つを使用すると、 をKafkaConsumerフェンシングして Kafka Streams コンシューマー グループに参加させることができなくなります。明らかに、KafkaConsumerKafka Streams で「遊ぶ」ことはできません。

于 2017-01-11T22:41:40.250 に答える