8

2 つの KTables に参加しようとしています。

KTable<String, RecordBean> recordsTable = builder.table(Serdes.String(),
    new JsonPOJOSerde<>(RecordBean.class),
    bidTopic, RECORDS_STORE);

KTable<String, ImpressionBean> impressionsTable = builder.table(Serdes.String(),
    new JsonPOJOSerde<>(ImpressionBean.class),
    impressionTopic, IMPRESSIONS_STORE);

KTable<String, RecordBean> mergedByTxId = recordsTable
    .join(impressionsTable, merge());

マージ関数は非常に単純です。ある Bean から別の Bean に値をコピーするだけです。

public static <K extends BidInfo, V extends BidInfo> ValueJoiner<K, V, K> merge() {
return (v1, v2) -> {
  v1.setRtbWinningBidAmount(v2.getRtbWinningBidAmount());
  return v1;
};

しかし、何らかの理由で、結合関数が 1 つの生成されたレコードに対して 2 回呼び出されています。以下のストリーミング/プロデューサー構成を参照してください

Properties streamsConfiguration = new Properties();
streamsConfiguration
    .put(StreamsConfig.APPLICATION_ID_CONFIG, "join-impressions");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());

streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zookeeperConnect());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, folder.newFolder("kafka-streams-tmp")
    .getAbsolutePath());

return streamsConfiguration;

プロデューサー構成 -

Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

return producerConfig;

次に、ストリームごとに 1 つのレコードを送信しています。両方のレコードのキーは同じです。出力として単一のレコードを受け取ることを期待しています。

 IntegrationTestUtils.produceKeyValuesSynchronously(bidsTopic,
    Arrays.asList(new KeyValue("1", getRecordBean("1"))),
    getProducerProperties());

IntegrationTestUtils.produceKeyValuesSynchronously(impressionTopic,
    Arrays.asList(new KeyValue("1", getImpressionBean("1"))),
    getProducerProperties());

List<KeyValue<String, String>> parsedRecord =
    IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(getConsumerProperties(),
        outputTopic, 1);

しかし、ValueJoiner は 2 回トリガーし、1 つではなく 2 つの同一の出力レコードを取得しています。トリガー時間中 - 両方のストリームからの両方の値が存在し、2 番目の実行をトリガーしているものを取得できません。

参加しないと、この動作を再現できません。2 ktable join の実例が見つからないため、私のアプローチの何が問題なのか理解できません。

同じ動作を示す簡単なコードを追加する

KStreamBuilder builder = new KStreamBuilder();

KTable<String, String> first = builder.table("stream1", "storage1");
KTable<String, String> second = builder.table("stream2", "storage2");

KTable<String, String> joined = first.join(second, (value1, value2) -> value1);

joined.to("output");

KafkaStreams streams = new KafkaStreams(builder, getStreamingProperties());

streams.start();

IntegrationTestUtils.produceKeyValuesSynchronously("stream1",
    Arrays.asList(new KeyValue("1", "first stream")),
    getProducerProperties());

IntegrationTestUtils.produceKeyValuesSynchronously("stream2",
    Arrays.asList(new KeyValue("1", "second stream")),
    getProducerProperties());

List<KeyValue<String, String>> parsedRecord =
    IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(getConsumerProperties(),
        "output", 1);
4

2 に答える 2

1

2 つの KTables 間で leftJoin を使用して同じ動作を見つけ、グーグルで調べた後、この投稿に出くわしました。使用していた kafka-streams のバージョンはわかりませんが、コンフルエントなコードをデバッグした後、kafka-streams バージョン 2.0.1 は、特定の種類の結合で古い値と新しい値を意図的に送信しているように見えるため、バリュージョイナー。

org.apache.kafka.streams.kstream.internals.KTableImpl#buildJoin結合トポロジを構築する実装org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin.KTableKTableRightJoinProcessor#processと、実行時にディスパッチする実装を見てください。一部のシナリオでは、明らかに 2 回行われています。

この動作の背景は次のとおりですhttps://issues.apache.org/jira/browse/KAFKA-2984

于 2019-05-22T23:39:08.840 に答える