0

//10 個のアイテムのウィンドウを作成する

WindowedStream<ObservationEvent,Tuple,GlobalWindow> windowStream = inputStream.keyBy("rackId").countWindow(10);

// ウィンドウ関数を適用し、ウィンドウ内のすべての値を評価するカスタムを追加します

DataStream<ObservationEvent> inactivityStream = windowStream.apply(new WindowFunction<ObservationEvent, ObservationEvent , Tuple , GlobalWindow>() { 

                        @Override 
                        public void apply(Tuple tuple, GlobalWindow timeWindow, Iterable<ObservationEvent> itr, Collector<ObservationEvent> out) 
                                //custom evaluation logic 
                                out.collect(new ObservationEvent(1,"temperature", "stable")); 
                        } 
                });

//単純な CEP パターンの定義

 Pattern<ObservationEvent, ?> inactivityPattern = Pattern.ObservationEvent>begin("first") 
                                .subtype(ObservationEvent.class) 
                                .where(new FilterFunction<ObservationEvent>() { 

                                        @Override 
                                        public boolean filter(ObservationEvent arg0) throws Exception { 
                                                System.out.println( arg0 );  //This function is not at all called
                                                return false; 
                                        } 
                });



PatternStream<ObservationEvent> inactivityCEP = CEP.pattern(inactivityStream.keyBy("rackId"), inactivityPattern); 

このコードを実行すると、where 句内のフィルター関数がまったく呼び出されません。inactivityStream.print ()を出力しましたが、一致する値を確認できます。

さて、ウィンドウを適用せずにinputStreamを直接プラグインすると。柄が合っている

inputStream と WindowedStream を出力したところ、どちらも同様の種類のデータを送信していることがわかります。

何が足りないの

4

1 に答える 1

0

FilterFunction は最終的に呼び出されるはずですが、初めて FilterFunction が呼び出されるのを見る前に、同じキーに対して 10 個のイベントを待つ必要があります。ウィンドウテストで十分に長く待っていない可能性がありますか?

一意のキーが多数ある場合は、フィルター関数が呼び出されるまで、ウィンドウ テストで 10 倍以上長く待機する必要があることを意味することに注意してください。

于 2017-01-06T21:36:55.623 に答える