0

Kafka サーバーにトピックがあります。プログラムでは、このトピックをストリームとして読み取り、イベント タイムスタンプを割り当てます。次に、このストリームに対してウィンドウ操作を行います。しかし、プログラムは機能しません。デバッグ後、WindowOperatorのprocessWatermarkメソッドが実行されていないようです。これが私のコードです。

    DataStream<Tuple2<String, Long>> advertisement = env
            .addSource(new FlinkKafkaConsumer082<String>("advertisement", new SimpleStringSchema(), properties))
            .map(new MapFunction<String, Tuple2<String, Long>>() {
                private static final long serialVersionUID = -6564495005753073342L;

                @Override
                public Tuple2<String, Long> map(String value) throws Exception {
                    String[] splits = value.split(" ");
                    return new Tuple2<String, Long>(splits[0], Long.parseLong(splits[1]));
                }
            }).assignTimestamps(timestampExtractor);

    advertisement
            .keyBy(keySelector)
            .window(TumblingTimeWindows.of(Time.of(10, TimeUnit.SECONDS)))
            .apply(new WindowFunction<Tuple2<String,Long>, Integer, String, TimeWindow>() {
                private static final long serialVersionUID = 5151607280638477891L;
                @Override
                public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Long>> values, Collector<Integer> out) throws Exception {
                    out.collect(Iterables.size(values));
                }
            }).print();

なぜこれが起こったのですか?「assignTimestamps(timestampExtractor)」の前に「keyBy(keySelector)」を追加すると、プログラムは機能します。誰でも理由を説明できますか?

4

1 に答える 1