このトピックについて多くの質問がありますが、これは重複した質問ではありません!
私が直面している問題は、Java 14 と Kafka 2.5.0 を使用して SpringBoot プロジェクトをセットアップしようとしたところ、Consumer がレコードの空のリストを返すことです。ここでのほとんどの回答は、頻繁にポーリングするか、オフセット モードを Earlyに設定するために、いくつかの忘れられたプロパティを示しています。
私の構成設定は型にはまらないように見えますが、 docs.confluent.ioとの論理的な違いはわかりません(以下のスニペットのjaas.confの設定を参照してください)。
@EnableKafka
@Configuration
public class KafkaConfig {
@Bean
public KafkaConsumer<Long, MyClass> consumerConfigs() {
Properties config = new Properties();
config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
config.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
config.put(ConsumerConfig.CLIENT_ID_CONFIG, "serviceName");
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "confluent-cloud");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "serviceName");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyClass.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 300_000);
System.setProperty("java.security.auth.login.config", ".\\src\\main\\resources\\jaas.conf");
return new KafkaConsumer<>(config);
}
}
ただし、これは機能します。例外 (Kafka など) は発生せず、接続が確立されます。
// jaas.conf-file
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
serviceName="serviceName"
username="username"
password="password";
};
これが私が実際にポーリングしているところです:
try {
KafkaConsumer<Long, MyClass> consumer = kafkaConfig.consumerConfigs();
consumer.subscribe(Collections.singletonList(inputTopic));
int count = 0;
Long start = System.currentTimeMillis();
Long end = System.currentTimeMillis();
while (end - start < 900_000) {
// boolean would be set to true in production
ConsumerRecords<Long, MyClass> records = consumer.poll(Duration.ofMillis(1000));
records.forEach(record -> {
MyOtherClass result = myOwnKafkaService.getSomethingFromRecord(record.key(), record.value());
System.out.println(result);
});
consumer.commitSync();
System.out.println("visualize number of loops made: " + ++count);
end = System.currentTimeMillis();
}
} catch (KafkaException e) {
e.printStackTrace();
} catch (Exception e) {
System.out.println(e.getMessage());
}
問題を見つけようとするために、印刷物やその他の混乱を追加しました。プログラムをデバッグ モードで実行し、次の行にブレークポイントを配置しました。
MyOtherClass result = myOwnKafkaService.getSomethingFromRecord(record.key(), record.value());
その結果、予想どおり、1 秒ごとのカウントが出力された行が表示されます。しかし、私のコンシューマーはレコードを返さないため、決して入力せforEach
ず、ブレークポイントをトリガーしません。
私のトピックは、2 つのパーティションがあるクラウドで確実に見ることができます。メッセージは安定した流れで生成されるので、何かを拾うことができるはずです。
クラスターに接続するのに時間がかかることはわかっていますが、現在の時刻が 15 分に設定されているので、少なくとも何かを受け取るはずですよね? 別の方法として、 TopicPartition を指定し、コンシューマーを に設定consumer.subscribe()
したメソッドに を切り替えてみました。正常に実行されましたが、何も返されませんでした。consumer.assign()
consumer.seekToBeginning()
最も一般的な例に見られないもう 1 つのことは、独自のクラスを使用していることです。の代わりに、このチュートリアルKafkaConsumer<String, String>
に従ってカスタム (デ) シリアライザーを実装しました。
それは私の構成設定でしょうか?ポーリングのタイムアウトに何か問題がありますか? (デ)シリアライゼーション、または完全に別のもの? レコードがゼロになっている理由を正確に特定することはできません。どんなフィードバックでも大歓迎です!