5

2 番目のイベントが x 秒以内に最初のイベントに続かなかった場合、状態を変更する必要があるシナリオがあります。たとえば、ユーザーが 100 分以内にログアウトしなかった場合、そのユーザーは無効な状態にあると見なされます。これは、現在のパターン操作を使用してどのように設計できますか?

4

2 に答える 2

5

As this has been implemented already, I thought of answering this question for those who are coming here looking for answers.

As of Flink 1.0.0, this can be done with handling the Timedout Pattern, for example, if your CEP pattern is something like this :

Example partially from from Flink Website (There are some major changes between 1.2 and 1.3 please adjust your code accordingly, this answer focuses on 1.3)

Pattern description: - Get first event of type "error", followed by a second event event of type "critical" within 10 seconds

Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
.next("middle").where(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event value) throws Exception {
        return value.getName().equals("error");
    }
}).followedBy("end").where(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event value) throws Exception {
        return value.getName().equals("critical");
    }
}).within(Time.seconds(10));

PatternStream<BAMEvent> patternStream = CEP.pattern(inputStream, pattern)

DataStream<Either<String, String>> result = patternStream.select(new PatternTimeoutFunction<Event, String>() {
  @Override
  public String timeout(Map<String, List<Event>> map, long l) throws Exception {
    return map.toString() +" @ "+ l;
  }
}, new PatternSelectFunction<Event, String>() {

  @Override
  public String select(Map<String, List<Event>> map) throws Exception {
    return map.toString();
  }
});

For this case, if the user doesn't logout even after 100 mins, then as the corresponding event wouldn't arrive, it would result in the pattern being timedout and the partial event(the initiating event) would be captured in the PatternTimeoutFunction.

于 2017-07-25T12:31:15.137 に答える
2

現時点では、これを行うことはできません。解決策は、イベント シーケンスが定義された時間枠から外れたために破棄されるたびにトリガーされるタイムアウト ハンドラーを用意することです。タイムアウト ハンドラーの実装を追跡する JIRA の問題が既に存在します。

于 2016-04-12T14:42:27.287 に答える