//10 個のアイテムのウィンドウを作成する
WindowedStream<ObservationEvent,Tuple,GlobalWindow> windowStream = inputStream.keyBy("rackId").countWindow(10);
// ウィンドウ関数を適用し、ウィンドウ内のすべての値を評価するカスタムを追加します
DataStream<ObservationEvent> inactivityStream = windowStream.apply(new WindowFunction<ObservationEvent, ObservationEvent , Tuple , GlobalWindow>() {
@Override
public void apply(Tuple tuple, GlobalWindow timeWindow, Iterable<ObservationEvent> itr, Collector<ObservationEvent> out)
//custom evaluation logic
out.collect(new ObservationEvent(1,"temperature", "stable"));
}
});
//単純な CEP パターンの定義
Pattern<ObservationEvent, ?> inactivityPattern = Pattern.ObservationEvent>begin("first")
.subtype(ObservationEvent.class)
.where(new FilterFunction<ObservationEvent>() {
@Override
public boolean filter(ObservationEvent arg0) throws Exception {
System.out.println( arg0 ); //This function is not at all called
return false;
}
});
PatternStream<ObservationEvent> inactivityCEP = CEP.pattern(inactivityStream.keyBy("rackId"), inactivityPattern);
このコードを実行すると、where 句内のフィルター関数がまったく呼び出されません。inactivityStream.print ()を出力しましたが、一致する値を確認できます。
さて、ウィンドウを適用せずにinputStreamを直接プラグインすると。柄が合っている
inputStream と WindowedStream を出力したところ、どちらも同様の種類のデータを送信していることがわかります。
何が足りないの