0

Trigger初めて 20 秒で起動し、その後は 5 秒ごとに起動する を作成したいと考えています。私は使用GlobalWindowsしてカスタムTrigger

val windowedStream = valueStream
                          .keyBy(0)
                          .window(GlobalWindows.create())
                          .trigger(TradeTrigger.of())

のコードは次のTradeTriggerとおりです。

@PublicEvolving
public class TradeTrigger<W extends Window> extends Trigger<Object, W> {

    private static final long serialVersionUID = 1L;

    static boolean flag=false;
    static long ctime = System.currentTimeMillis();

    private TradeTrigger() {
    }

    @Override
    public TriggerResult onElement(
            Object arg0,
            long arg1,
            W arg2,
            org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg3)
            throws Exception {
        // TODO Auto-generated method stub

        if(flag == false){
            if((System.currentTimeMillis()-ctime) >= 20000){
               flag = true;
               ctime = System.currentTimeMillis();
               return TriggerResult.FIRE;
            }
            return TriggerResult.CONTINUE;
        } else {
            if((System.currentTimeMillis()-ctime) >= 5000){
                ctime = System.currentTimeMillis();
                return TriggerResult.FIRE;
            }
            return TriggerResult.CONTINUE;
        }

    }

    @Override
    public TriggerResult onEventTime(
            long arg0,
            W arg1,
            org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2)
            throws Exception {
        // TODO Auto-generated method stub
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(
            long arg0,
            W arg1,
            org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2)
            throws Exception {
        // TODO Auto-generated method stub
        return TriggerResult.CONTINUE;
    }


    public static <W extends Window> TradeTrigger<W> of() {
        return new TradeTrigger<>();
    }

}

したがって、基本的に、flagがの場合false、つまり初めての場合、Triggerは 20 秒以内に起動され、 が に設定さflagtrueます。次回からは5秒ごとに発射されるはずです。

私が直面している問題は、Triggerが起動されるたびに出力にメッセージが 1 つしか表示されないことです。つまり、20 秒後に 1 つのメッセージを受け取り、5 秒ごとに 1 つのメッセージを受け取ります。トリガーごとに 20 のメッセージが出力されることを期待しています。

5 秒のタイム ウィンドウを使用.timeWindow(Time.seconds(5))して作成すると、5 秒ごとに 20 個のメッセージが出力されます。このコードを正しく理解するのを手伝ってください。足りないものはありますか?

4

2 に答える 2

1

Fabian および Flink メーリング リストからの回答の助けを借りて動作するようになりました。ValueStateを通じて状態を変数に格納しましたTriggerContext。メソッドで変数を確認しonEvent()、初めてのprocessingTimeTimer場合は現在時刻より 20 秒多く a を登録し、状態を更新しました。onProcessingTimeメソッドでは、現在時刻より 5 秒長く別の時刻を登録しProcessingTimeTimer、状態を更新してWindow.

于 2016-04-25T12:46:15.090 に答える