0

このトピックについて多くの質問がありますが、これは重複した質問ではありません!

私が直面している問題は、Java 14 と Kafka 2.5.0 を使用して SpringBoot プロジェクトをセットアップしようとしたところ、Consumer がレコードの空のリストを返すことです。ここでのほとんどの回答は、頻繁にポーリングするか、オフセット モードを Earlyに設定するために、いくつかの忘れられたプロパティを示しています。

私の構成設定は型にはまらないように見えますが、 docs.confluent.ioとの論理的な違いはわかりません(以下のスニペットのjaas.confの設定を参照してください)。

@EnableKafka
@Configuration
public class KafkaConfig {

    @Bean
    public KafkaConsumer<Long, MyClass> consumerConfigs() {
        Properties config = new Properties();

        config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        config.put(SaslConfigs.SASL_MECHANISM, "PLAIN");

        config.put(ConsumerConfig.CLIENT_ID_CONFIG, "serviceName");
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "confluent-cloud");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "serviceName");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyClass.class);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 300_000);

        System.setProperty("java.security.auth.login.config", ".\\src\\main\\resources\\jaas.conf");

        return new KafkaConsumer<>(config);
    }
}

ただし、これは機能します。例外 (Kafka など) は発生せず、接続が確立されます。

// jaas.conf-file
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
serviceName="serviceName"
username="username"
password="password";
};

これが私が実際にポーリングしているところです:

try {
            KafkaConsumer<Long, MyClass> consumer = kafkaConfig.consumerConfigs();
            consumer.subscribe(Collections.singletonList(inputTopic));

            int count = 0;
            Long start = System.currentTimeMillis();
            Long end = System.currentTimeMillis();

            while (end - start < 900_000) { 
                // boolean would be set to true in production
                ConsumerRecords<Long, MyClass> records = consumer.poll(Duration.ofMillis(1000));
                records.forEach(record -> {
                    MyOtherClass result = myOwnKafkaService.getSomethingFromRecord(record.key(), record.value());
                    System.out.println(result);
                });
               
                consumer.commitSync();

                System.out.println("visualize number of loops made: " + ++count);
                end = System.currentTimeMillis();
            }
        } catch (KafkaException e) {
            e.printStackTrace();
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }

問題を見つけようとするために、印刷物やその他の混乱を追加しました。プログラムをデバッグ モードで実行し、次の行にブレークポイントを配置しました。

MyOtherClass result = myOwnKafkaService.getSomethingFromRecord(record.key(), record.value());

その結果、予想どおり、1 秒ごとのカウントが出力された行が表示されます。しかし、私のコンシューマーはレコードを返さないため、決して入力せforEachず、ブレークポイントをトリガーしません。

私のトピックは、2 つのパーティションがあるクラウドで確実に見ることができます。メッセージは安定した流れで生成されるので、何かを拾うことができるはずです。

クラスターに接続するのに時間がかかることはわかっていますが、現在の時刻が 15 分に設定されているので、少なくとも何かを受け取るはずですよね? 別の方法として、 TopicPartition を指定し、コンシューマーを に設定consumer.subscribe()したメソッドに を切り替えてみました。正常に実行されましたが、何も返されませんでした。consumer.assign()consumer.seekToBeginning()

最も一般的な例に見られないもう 1 つのことは、独自のクラスを使用していることです。の代わりに、このチュートリアルKafkaConsumer<String, String>に従ってカスタム (デ) シリアライザーを実装しました。

それは私の構成設定でしょうか?ポーリングのタイムアウトに何か問題がありますか? (デ)シリアライゼーション、または完全に別のもの? レコードがゼロになっている理由を正確に特定することはできません。どんなフィードバックでも大歓迎です!

4

1 に答える 1

0

問題が解決しました。投稿された質問から判断できるものではありませんでしたが、他の誰かが同様の構成で立ち往生していることに気付いた場合に備えて、いくつかのことを明確にしたいと思います。

  1. 受け取ったパスワードが本当に正しいことを確認してください。フェイスパーム

彼がクラスターに接続していると思ったのですが、代わりに、.poll(Duration.ofMillis(1000))メソッドが実行されるため、ループがカウントを出力し続けました->指定されたタイムアウト内に接続できるかどうかを確認します->接続が確立されている場合はゼロレコードを返します失敗した。エラーはスローされません。通常、2 秒ほどで接続が確立されます。

  1. データベースへの接続を確認してください。

アプリケーションを停止したくないので、myOwnKafkaService.getSomethingFromRecord(record.key(), record.value())すべてのエラーをログに記録する方法を設計しましたが、すべての例外がキャッチされます。ログを確認するまで、リモート データベースへのアクセス許可が正しくないことに気付きました。

  1. TimeStamp として知られているものは、java.util.Date にデシリアライズする必要があります。

間違って解析すると例外がスローされますが、私のメソッドはnull. この回答のすべての発言のように、これもまた、そのような設定で経験が浅いことに要約されます。以下の修正されたクラスは、実際の例として役立ちます (ただし、完全にベスト プラクティスというわけではありません)。

KafkaConfig:

@EnableKafka
@Configuration
public class KafkaConfig {

    @Bean
    public KafkaConsumer<Long, MyClass> consumerConfigs() {
        Properties config = new Properties();

        config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        config.put(SaslConfigs.SASL_MECHANISM, "PLAIN");

        config.put(ConsumerConfig.CLIENT_ID_CONFIG, "serviceName");
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "confluent-cloud");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "serviceName");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyClass.class);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100_000);
        config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 300_000);
        config.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 300_000);

        System.setProperty("java.security.auth.login.config", ".\\src\\main\\resources\\jaas.conf");

        return new KafkaConsumer<>(config);
    }
}

ポーリング メソッドの本体:

            KafkaConsumer<Long, MyClass> consumer = kafkaConfig.consumerConfigs();
            consumer.subscribe(Collections.singletonList(inputTopic));

            while (true) {
                ConsumerRecords<Long, MyClass> records = consumer.poll(Duration.ofMillis(1000));
                records.forEach(record -> {
                    MyOtherClass result = myOwnKafkaService.getSomethingFromRecord(record.key(), record.value());
                    System.out.println(result);
                });
                consumer.commitSync();
            }

Deserializer を使用した MyClass の小さな例:

@Data
@Slf4J
public class MyClass implements Deserializer<MyClass> {

    @JsonProperty("UNIQUE_KEY")
    private Long uniqueKey;
    @JsonProperty("EVENT_TIMESTAMP")
    @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS")
    private Date eventTimestamp;
    @JsonProperty("SOME_OTHER_FIELD")
    private String someOtherField;

@Override
    public MyClass deserialize(String s, byte[] bytes) {
        ObjectMapper mapper = new ObjectMapper();
        MyClass event = null;
        try {
            event = mapper
                    .registerModule(new JavaTimeModule())
                    .readValue(bytes, MyClass.class);
        } catch (Exception e) {
            log.error("Something went wrong during the deserialization of the MyClass: {}", e.getMessage());
        }
        return event;
    }
}

これが将来誰かに役立つことを願っています。私は挫折と過ちからかなりのことを学びました。

于 2020-07-10T16:05:47.107 に答える