1

次のように、Confluent Cloud 上の私の kafka クラスタからデータを読み取るために、次のように Apchea Beam で非常に単純なパイプラインを作成しました。

        Pipeline pipeline = Pipeline.create(options);

        Map<String, Object> propertyBuilder = new HashMap();
        propertyBuilder.put("ssl.endpoint.identification.algorithm", "https");
        propertyBuilder.put("sasl.mechanism","PLAIN");
        propertyBuilder.put("request.timeout.ms","20000");
        propertyBuilder.put("retry.backoff.ms","500");

        pipeline
            .apply(KafkaIO.<byte[], byte[]>readBytes()
               .withBootstrapServers("pkc-epgnk.us-central1.gcp.confluent.cloud:9092")
               .withTopic("gcp-ingestion-1")  
               .withKeyDeserializer(ByteArrayDeserializer.class)
               .withValueDeserializer(ByteArrayDeserializer.class)
               .updateConsumerProperties(propertyBuilder)             
               .withoutMetadata() // PCollection<KV<Long, String>>
            ) .apply(Values.<byte[]>create());

ただし、上記のコードを実行してカフカクラスターからデータを読み取ると、例外が発生します

私は直接 Java ランナーで上記を実行します。私はビーム 2.8 を使用しています。

上記のコードではできませんが、カフカ合流クラスターへのメッセージを読み取って生成できます。

4

1 に答える 1