3

私のコードは次のとおりです。

StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<MyObject> input = env.addSource(new MyCustomSource());

Pattern<MyObject, ?> pattern = Pattern.<MyObject>begin("start");

PatternStream<MyObject> patternStream = CEP.pattern(input, pattern);

... 私のパターンを定義する

DataStream<MyObject> resultStream = patternStream.select(new MyCustomPatternSelectFunction());

resultStream.addSink(new MyCustomSinkFunction(subscriptionCriteria));

try
  {
    env.execute();
  }
  catch (Exception exception)
  {
    log.debug("Error while ", exception);
  }

このコードは機能し、私が望むことを行い、設定したパターンに従う結果ストリームを取得します。

私が知りたいのは、後で環境に追加したこのソースに新しいパターンを適用できるかどうかです新しい結果ストリームに加えて、冗長な古い結果ストリームを取得します (つまり、古いパターンが複数回実行されます)?

4

1 に答える 1

1

現時点では、Flink の CEP ライブラリはそのままでは動的なパターン変更をサポートしていません。したがって、パターンを定義してジョブを開始すると、この定義されたパターンのみが処理されます。

ただし、一方の入力パターン定義ともう一方の入力でストリーム レコードを受け取るインターフェイスを実装する独自の演算子を作成できますTwoInputStreamOperator(CoFlatMap 関数と同様)。新しいパターンごとNFAに、オペレーターで new をコンパイルし、新しい受信ストリーム要素をこれNFAにもフィードする必要があります。そうすれば、意図した動作を実現できます。

将来的には、この機能を Flink の CEP ライブラリに追加する可能性が高いでしょう。

于 2016-12-05T09:36:07.487 に答える