40

私がやりたいことはこれです:

  1. 数値トピックからレコードを消費する (Long's)
  2. 5 秒のウィンドウごとに値を集計 (カウント) します。
  3. FINAL 集計結果を別のトピックに送信する

私のコードは次のようになります。

KStream<String, Long> longs = builder.stream(
            Serdes.String(), Serdes.Long(), "longs");

// In one ktable, count by key, on a five second tumbling window.
KTable<Windowed<String>, Long> longCounts = 
            longs.countByKey(TimeWindows.of("longCounts", 5000L));

// Finally, sink to the long-avgs topic.
longCounts.toStream((wk, v) -> wk.key())
          .to("long-counts");

すべてが期待どおりに機能しているように見えますが、集計は着信レコードごとに宛先トピックに送信されます。私の質問は、各ウィンドウの最終集計結果のみを送信するにはどうすればよいですか?

4

3 に答える 3

32

Kafka Streams には、「最終集計」などはありません。ウィンドウは、ウィンドウの終了時間が経過した後に到着する順不同のレコードを処理するために、常に開いたままになっています。ただし、ウィンドウは永久に保持されるわけではありません。保持期間が満了すると、それらは破棄されます。ウィンドウがいつ破棄されるかについて特別なアクションはありません。

詳細については、Confluent のドキュメントを参照してください: http://docs.confluent.io/current/streams/

したがって、集約に対する更新ごとに、結果レコードが生成されます (Kafka ストリームは、順不同のレコードの集約結果も更新するため)。あなたの「最終結果」は、最新の結果レコードになります(ウィンドウが破棄される前)。ユースケースによっては、手動の重複除外が問題を解決する方法になります (下位レベルの API を使用するか、transform()またはprocess()) 。

このブログ投稿も役立つかもしれません: https://timothyrenner.github.io/engineering/2016/08/11/kafka-streams-not- looking-at-facebook.html

句読点を使用せずにこの問題に対処する別のブログ投稿: http://blog.inovatrend.com/2018/03/make-of-message-gateway-with-kafka.html

アップデート

KIP-328では、KTable#suppress()演算子が追加され、厳密な方法で連続した更新を抑制し、ウィンドウごとに 1 つの結果レコードを発行できるようになります。トレードオフはレイテンシの増加です。

于 2016-08-14T18:24:45.837 に答える
0

問題に直面しましたが、この問題を解決して、修正されたウィンドウの後に grace(0) を追加し、抑制された API を使用します

public void process(KStream<SensorKeyDTO, SensorDataDTO> stream) {

        buildAggregateMetricsBySensor(stream)
                .to(outputTopic, Produced.with(String(), new SensorAggregateMetricsSerde()));

    }

private KStream<String, SensorAggregateMetricsDTO> buildAggregateMetricsBySensor(KStream<SensorKeyDTO, SensorDataDTO> stream) {
        return stream
                .map((key, val) -> new KeyValue<>(val.getId(), val))
                .groupByKey(Grouped.with(String(), new SensorDataSerde()))
                .windowedBy(TimeWindows.of(Duration.ofMinutes(WINDOW_SIZE_IN_MINUTES)).grace(Duration.ofMillis(0)))
                .aggregate(SensorAggregateMetricsDTO::new,
                        (String k, SensorDataDTO v, SensorAggregateMetricsDTO va) -> aggregateData(v, va),
                        buildWindowPersistentStore())
                .suppress(Suppressed.untilWindowCloses(unbounded()))
                .toStream()
                .map((key, value) -> KeyValue.pair(key.key(), value));
    }


    private Materialized<String, SensorAggregateMetricsDTO, WindowStore<Bytes, byte[]>> buildWindowPersistentStore() {
        return Materialized
                .<String, SensorAggregateMetricsDTO, WindowStore<Bytes, byte[]>>as(WINDOW_STORE_NAME)
                .withKeySerde(String())
                .withValueSerde(new SensorAggregateMetricsSerde());
    }

ここで結果を見ることができます

ここに画像の説明を入力

于 2020-09-11T17:36:49.790 に答える