私のコードは次のとおりです。
StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<MyObject> input = env.addSource(new MyCustomSource());
Pattern<MyObject, ?> pattern = Pattern.<MyObject>begin("start");
PatternStream<MyObject> patternStream = CEP.pattern(input, pattern);
... 私のパターンを定義する
DataStream<MyObject> resultStream = patternStream.select(new MyCustomPatternSelectFunction());
resultStream.addSink(new MyCustomSinkFunction(subscriptionCriteria));
try
{
env.execute();
}
catch (Exception exception)
{
log.debug("Error while ", exception);
}
このコードは機能し、私が望むことを行い、設定したパターンに従う結果ストリームを取得します。
私が知りたいのは、後で環境に追加したこのソースに新しいパターンを適用できるかどうかです新しい結果ストリームに加えて、冗長な古い結果ストリームを取得します (つまり、古いパターンが複数回実行されます)?