2

すべてのログが集中トピックにプッシュされるトピックがありますが、可能であれば、それらのレコードの一部を別のトピックとクラスターにフィルターで除外したいと考えています。

ありがとう

4

1 に答える 1

6

Kafka ストリームでは、異なる Kafka クラスターからのソース トピックと出力トピックを使用してストリームを作成することはできません。したがって、次のコードは機能しません

streamsBuilder.stream(sourceTopicName).filter(..).to(outputTopicName)

この場合、outputTopicName はトピック sourceTopicName と同じクラスターからのものであると想定されます。

回避策として、別のクラスターから出力トピックにメッセージを送信するために、追加で作成された KafkaProducer をbootstrap.servers、外部クラスターとKStream.foreach()メソッドを指すプロパティで使用できます。

streamsBuilder.stream(sourceTopicName)
    .filter((key, value) -> ..)
    .foreach((key, value) -> 
        sendMessage(kafkaProducerFromAnotherCluster, destinationTopicName, key, value);


public static void sendMessage(KafkaProducer<String, String> kafkaProducer, 
                               String destinationTopicName, String key, String value) {
    try {
        kafkaProducer.send(new ProducerRecord(destinationTopicName, key, value));
    } catch (RuntimeException ex) {
        log.error(errorMessage, ex);
    }
}

もう 1 つのオプションは、Kafka クラスターに出力トピックを作成し、メッセージをフィルター処理して、2 つのクラスター間でKafka ミラーリングをセットアップすることです (メッセージはあるトピックから別のクラスターにコピーされます)。

于 2018-10-28T19:14:43.010 に答える