Flink 1.2-Snapshot を使用しています。私のデータは次のようになります。
- id=25398102、sourceId=1、ts=2016-10-15 00:00:56、ユーザー=14、値=919
- id=25398185、sourceId=1、ts=2016-10-15 00:01:06、ユーザー=14、値=920
- id=25398210、sourceId=1、ts=2016-10-15 00:01:16、ユーザー=14、値=944
- id=25398235、sourceId=1、ts=2016-10-15 00:01:24、ユーザー=3149、値=944
- id=25398236、sourceId=1、ts=2016-10-15 00:01:25、ユーザー=71、値=955
- id=25398239、sourceId=1、ts=2016-10-15 00:01:26、ユーザー=71、値=955
- id=25398265、sourceId=1、ts=2016-10-15 00:01:36、ユーザー=71、値=955
- id=25398310、sourceId=1、ts=2016-10-15 00:02:16、ユーザー=14、値=960
- id=25398320、sourceId=1、ts=2016-10-15 00:02:26、ユーザー=14、値=1000
Windows ベースのユーザー ID を作成するために、次のコードを実行しています。
stream.flatMap(new LogsParser())
.assignTimestampsAndWatermarks(new MessageTimestampExtractor())
.keyBy("sourceId")
.window(GlobalWindows.create())
.trigger(PurgingTrigger.of(new MySessionTrigger()))
.apply(new SessionWindowFunction())
.print();
MySession トリガーは、受信したイベントを調べ、ユーザー ID をチェックして、ユーザー ID の変更時にウィンドウをトリガーします。SessionWindowFunction は、ウィンドウからセッションを作成するだけです。
作成されたセッションは次のとおりです。
セッション:
- id=25398102、sourceId=1、ts=2016-10-15 00:00:56、ユーザー=14、値=919
- id=25398185、sourceId=1、ts=2016-10-15 00:01:06、ユーザー=14、値=920
- id=25398210、sourceId=1、ts=2016-10-15 00:01:16、ユーザー=14、値=944
- id=25398235、sourceId=1、ts=2016-10-15 00:01:24、ユーザー=3149、値=944
セッション:
- id=25398236、sourceId=1、ts=2016-10-15 00:01:25、ユーザー=71、値=955
- id=25398239、sourceId=1、ts=2016-10-15 00:01:26、ユーザー=71、値=955
- id=25398265、sourceId=1、ts=2016-10-15 00:01:36、ユーザー=71、値=955
- id=25398310、sourceId=1、ts=2016-10-15 00:02:16、ユーザー=14、値=960
セッション:
- id=25398320、sourceId=1、ts=2016-10-15 00:02:26、ユーザー=14、値=1000
ご覧のとおり、問題は、すべてのセッションで最後のイベントが実際には次のウィンドウに属していることです。最後のイベントがすでにウィンドウ内にあるため、ウィンドウをトリガーする決定は何らかの理由で遅れています。
そのウィンドウの最後のイベントを考慮せずにウィンドウをトリガーするにはどうすればよいですか?