次のように、Kafka Streams DSL を使用して Kafka クラスターに接続する Kafka Streams アプリがあります。
KStreamBuilder builder = new KStreamBuilder();
KStream<String, byte[]> stream = builder.stream(myTopic);
// do work
kStreams = new KafkaStreams(builder, config);
kStreams.start();
そして、コンシューマー クライアントを直接使用してクラスターへの接続を確立するコード ベースの別の部分。
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config, keyDeserializer, valueDeserializer);
consumer.subscribe(Collections.singletonList(sourceTopic));
consumer.poll(500L);
// etc
consumer.close();
これを行っている理由は、アプリの他の部分 (Kafka Streams トポロジーを含む) を条件付きで開始する前に、コンシューマー グループに関するメタ データを収集するためです。InconsistentGroupProtocolException
これを行う方法はおそらく他にもありますが (たとえば、さまざまなフックを使用するなど)、これらのメソッドを混在させると (断続的に)スローされることがある理由についてもっと知りたいです。
なぜこれがスローされているのか、誰かが光を当てることができますか? ソースコード自体から正確に何が起こっているのかを判断するのに苦労していますが、Kafka Streams によって構築された基礎となるコンシューマーは、クライアントとは異なるパーティショニング プロトコルを指定していると思いKafkaConsumer
ます。とにかく、この例外を理解するための助けは大歓迎です