1

Kafka ストリームを読み取り、セッションを集約するための Flink ジョブを実装しようとしていますが、何らかの理由で getResult() が呼び出されていません。createAccumulator() と add() が呼び出されたことがわかりました。集約されたメッセージを送信先にシンクできるように、getResult() も呼び出されることを期待しています。

        source.keyBy(new KeySelector<GenericRecord, String>() {
                    @Override
                    public String getKey(GenericRecord record) {
                        return record.get("id").toString();
                    }})
                .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<GenericRecord>() {
                    private static final long serialVersionUID = -4834111073247835189L;
                    private final long maxTimeLag = 300000L;

                    @Nullable
                    @Override
                    public Watermark checkAndGetNextWatermark(GenericRecord lastElement, long extractedTimestamp) {
                        return new Watermark(extractedTimestamp - maxTimeLag);
                    }

                    @Override
                    public long extractTimestamp(GenericRecord element, long previousElementTimestamp) {
                        long ts = 1000 * (long)element.get(("timestamp"));
                        return (ts);
                    }
                })
                .map(new ReduceAttributesMap())
                .keyBy(new KeySelector<Tuple2<String, String>, String>() {
                    @Override
                    public String getKey(Tuple2<String, String> e) {
                        return e.f0;
                    }
                })
                .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
                .aggregate(new EventAggregation())
                .addSink(...)

問題は何ですか?私は何かを誤って設定しましたか?あなたの助けに感謝!

4

1 に答える 1