1 日に A が発生し、15 分間に B が発生する回数を数える必要があります。ストリームは、おそらく A1 、A2、B1、B2、A3、B3、B4、B5、A4、A5、A6、A7、B6 です。私の場合、イベントの結果は A2,B1 A3,B3 A7,B6 です。 そして、マッチャーが発生したときにリアルタイムで結果を受け取る必要があります。 sql-cep は集計をサポートしていません。発生したイベントのみを計算します。 この場合、単一の SQL でこのタスクを実行する方法。
私はそれを行うのに2ステップ疲れました.flink sql cepを最初にマッチャーに使用し、次にkafkaにシンクします。ステップでは、プレカフカをソースし、オーバーウィンドウを使用して集計します。
最初のステップ: select pin as pin,'first-step' as result_id, cast(order_amount as varchar) as result_value,event_time as result_time from stra_dtpipeline MATCH_RECOGNIZE ( PARTITION BY pin
ORDER BY event_time MEASURES
t1.pin as pin, '1' as order_amount , LOCALTIMESTAMP as event_time 一致ごとに 1 行 一致後 次の行にスキップ PATTERN (t1 t2) WITHIN INTERVAL '30' SECOND
DEFINE
t1 as t1.act_type='100001' , t2 as t2.act_type='100002' ) 二段目: select pin,'job5' as result_id,cast(sum(1) over (PARTITION BY pin,cast(DATE_FORMAT(event_time, '%Y%m%d') as VARCHAR) order by event_time ROWS BETWEEN INTERVAL '1' DAY PRECEDING AND CURRENT ROW ) as result_value, CURRENT_TIMESTAMP as result_time from stra_dtpipeline_mid where result_id='first-step' and DAYOFMONTH(CURRENT_DATE) )=DAYOFMONTH(event_time)
単一の SQL でこのタスクを達成できると思います。