最近、バッチ処理を高速化するために Apache Flink を使用しようとしています。column:value と無関係なインデックス列を持つテーブルがあります
基本的に、5行ごとの値の平均と範囲を計算したいと思います。次に、計算したばかりの平均に基づいて、平均と標準偏差を計算します。したがって、最良の方法はウィンドウを使用することだと思いTumble
ます。
このように見えます
DataSet<Tuple2<Double, Integer>> rawData = {get the source data};
Table table = tableEnvironment.fromDataSet(rawData);
Table groupedTable = table
.window(Tumble.over("5.rows").on({what should I write?}).as("w")
.groupBy("w")
.select("f0.avg, f0.max-f0.min");
{The next step is to use groupedTable to calculate overall mean and stdDev}
でも何を書けばいいのかわからない.on()
。試してみ"proctime"
ましたが、そのような入力はありませんとのことでした。ソースから読み取る順序でグループ化するだけです。しかし、それは時間属性でなければならないので、使用できません"f2"
-インデックス列も順序付けとして。
これを行うには、タイムスタンプを追加する必要がありますか? バッチ処理で必要ですか?計算が遅くなりますか? これを解決する最善の方法は何ですか?
更新: テーブル API でスライディング ウィンドウを使用しようとしたところ、例外が発生しました。
// Calculate mean value in each group
Table groupedTable = table
.groupBy("f0")
.select("f0.cast(LONG) as groupNum, f1.avg as avg")
.orderBy("groupNum");
//Calculate moving range of group Mean using sliding window
Table movingRangeTable = groupedTable
.window(Slide.over("2.rows").every("1.rows").on("groupNum").as("w"))
.groupBy("w")
.select("groupNum.max as groupNumB, (avg.max - avg.min) as MR");
例外は次のとおりです。
スレッド「メイン」での例外 java.lang.UnsupportedOperationException: イベント時のカウント スライディング グループ ウィンドウは現在サポートされていません。
org.apache.flink.table.plan.nodes.dataset.DataSetWindowAggregate.createEventTimeSlidingWindowDataSet (DataSetWindowAggregate.scala:456) で
org.apache.flink.table.plan.nodes.dataset.DataSetWindowAggregate.translateToPlan(DataSetWindowAggregate.scala:139) で
...
テーブル API ではスライディング ウィンドウがサポートされていないということですか? 私の記憶が正しければ、DataSet API にはウィンドウ関数がありません。では、バッチ処理で移動範囲を計算するにはどうすればよいですか?