0

Flink 1.2 CEP を使用して、デバイスから欠落しているハート ビート イベントを検出しています。RabbitMQ ソースからハート ビート イベントを読み取り、次のパターンを使用して、シリアル番号でキーが設定されたデバイスのハート ビートの欠落をタイムアウト関数で検出します。

このパターン ストリームは、少なくとも 1 つのハートビートがデバイスから送信される場合に機能します。しかし、アプリケーションの起動後にハートビートが 1 つも開始されていないデバイスの欠落ハートビートを検出するユースケースも処理する必要があります。

このためには、すべてのデバイスの init Heart Beat イベントで入力ハートビート ストリームを初期化する必要があります。ストリームを初期化すると、最初のハートビートを受信して​​いないデバイスも処理され、タイムアウトしてアラートが発生します。

RMQSource 関数からリッスンする前であっても、すべてのデバイスの初期化ハート ビート データでデータ ストリームを初期化するにはどうすればよいですか?

//Reading heart beat of device from RabbitMQ queue
DataStream<HeartBeatEvent> heartBeatStream= 
    env.addSource(rmqSource).assignTimestampsAndWatermarks(new 
           IngestionTimeExtractor<String>());                                   

//Pattern to detect missing heartbeat
Pattern<HeartBeanEvent, ?> heartBeatEventPattern = Pattern.< 
          HeartBeanEvent >begin("first")
            .subtype(HeartBeanEvent.class)
            .next("second")
            .subtype(HeartBeanEvent.class)
            .within(Time.seconds(360));

DataStream<Either< HeartBeanEvent, String>> result = 
     CEP.pattern(heartBeatStream.keyBy(serialNum), 
                 heartBeatEventPattern).
  select(new PatternTimeoutFunction< HeartBeanEvent, HeartBeanEvent >() {
            public HeartBeanEvent timeout(Map<String, HeartBeanEvent > 
                   pattern, long timeoutTimestamp) throws Exception {
   System.out.println("Missing heart beat:" + 
           pattern.get("first").getSerialNum() + ":" + 
           pattern.get("first").getEventTime());  
           return pattern.get("first");
            }
    },new PatternSelectFunction< HeartBeanEvent, String>() {
            public String select(Map<String, HeartBeanEvent > pattern) {                     
                return null;
            }
        });
4

0 に答える 0