1

最近、バッチ処理を高速化するために Apache Flink を使用しようとしています。column:value と無関係なインデックス列を持つテーブルがあります

基本的に、5行ごとの値の平均と範囲を計算したいと思います。次に、計算したばかりの平均に基づいて、平均と標準偏差を計算します。したがって、最良の方法はウィンドウを使用することだと思いTumbleます。

このように見えます

DataSet<Tuple2<Double, Integer>> rawData = {get the source data};
Table table = tableEnvironment.fromDataSet(rawData);
Table groupedTable = table
            .window(Tumble.over("5.rows").on({what should I write?}).as("w")
            .groupBy("w")
            .select("f0.avg, f0.max-f0.min");

{The next step is to use groupedTable to calculate overall mean and stdDev} 

でも何を書けばいいのかわからない.on()。試してみ"proctime"ましたが、そのような入力はありませんとのことでした。ソースから読み取る順序でグループ化するだけです。しかし、それは時間属性でなければならないので、使用できません"f2"-インデックス列も順序付けとして。

これを行うには、タイムスタンプを追加する必要がありますか? バッチ処理で必要ですか?計算が遅くなりますか? これを解決する最善の方法は何ですか?

更新: テーブル API でスライディング ウィンドウを使用しようとしたところ、例外が発生しました。

// Calculate mean value in each group
    Table groupedTable = table
            .groupBy("f0")
            .select("f0.cast(LONG) as groupNum, f1.avg as avg")
            .orderBy("groupNum");

//Calculate moving range of group Mean using sliding window
    Table movingRangeTable = groupedTable
            .window(Slide.over("2.rows").every("1.rows").on("groupNum").as("w"))
            .groupBy("w")
            .select("groupNum.max as groupNumB, (avg.max - avg.min) as MR");

例外は次のとおりです。

スレッド「メイン」での例外 java.lang.UnsupportedOperationException: イベント時のカウント スライディング グループ ウィンドウは現在サポートされていません。

org.apache.flink.table.plan.nodes.dataset.DataSetWindowAggregate.createEventTimeSlidingWindowDataSet (DataSetWindowAggregate.scala:456) で

org.apache.flink.table.plan.nodes.dataset.DataSetWindowAggregate.translateToPlan(DataSetWindowAggregate.scala:139) で

...

テーブル API ではスライディング ウィンドウがサポートされていないということですか? 私の記憶が正しければ、DataSet API にはウィンドウ関数がありません。では、バッチ処理で移動範囲を計算するにはどうすればよいですか?

4

1 に答える 1