Trigger
初めて 20 秒で起動し、その後は 5 秒ごとに起動する を作成したいと考えています。私は使用GlobalWindows
してカスタムTrigger
val windowedStream = valueStream
.keyBy(0)
.window(GlobalWindows.create())
.trigger(TradeTrigger.of())
のコードは次のTradeTrigger
とおりです。
@PublicEvolving
public class TradeTrigger<W extends Window> extends Trigger<Object, W> {
private static final long serialVersionUID = 1L;
static boolean flag=false;
static long ctime = System.currentTimeMillis();
private TradeTrigger() {
}
@Override
public TriggerResult onElement(
Object arg0,
long arg1,
W arg2,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg3)
throws Exception {
// TODO Auto-generated method stub
if(flag == false){
if((System.currentTimeMillis()-ctime) >= 20000){
flag = true;
ctime = System.currentTimeMillis();
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
} else {
if((System.currentTimeMillis()-ctime) >= 5000){
ctime = System.currentTimeMillis();
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onEventTime(
long arg0,
W arg1,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2)
throws Exception {
// TODO Auto-generated method stub
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(
long arg0,
W arg1,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2)
throws Exception {
// TODO Auto-generated method stub
return TriggerResult.CONTINUE;
}
public static <W extends Window> TradeTrigger<W> of() {
return new TradeTrigger<>();
}
}
したがって、基本的に、flag
がの場合false
、つまり初めての場合、Trigger
は 20 秒以内に起動され、 が に設定さflag
れtrue
ます。次回からは5秒ごとに発射されるはずです。
私が直面している問題は、Trigger
が起動されるたびに出力にメッセージが 1 つしか表示されないことです。つまり、20 秒後に 1 つのメッセージを受け取り、5 秒ごとに 1 つのメッセージを受け取ります。トリガーごとに 20 のメッセージが出力されることを期待しています。
5 秒のタイム ウィンドウを使用.timeWindow(Time.seconds(5))
して作成すると、5 秒ごとに 20 個のメッセージが出力されます。このコードを正しく理解するのを手伝ってください。足りないものはありますか?