0

Flink 1.2-Snapshot を使用しています。私のデータは次のようになります。

  • id=25398102、sourceId=1、ts=2016-10-15 00:00:56、ユーザー=14、値=919
  • id=25398185、sourceId=1、ts=2016-10-15 00:01:06、ユーザー=14、値=920
  • id=25398210、sourceId=1、ts=2016-10-15 00:01:16、ユーザー=14、値=944
  • id=25398235、sourceId=1、ts=2016-10-15 00:01:24、ユーザー=3149、値=944
  • id=25398236、sourceId=1、ts=2016-10-15 00:01:25、ユーザー=71、値=955
  • id=25398239、sourceId=1、ts=2016-10-15 00:01:26、ユーザー=71、値=955
  • id=25398265、sourceId=1、ts=2016-10-15 00:01:36、ユーザー=71、値=955
  • id=25398310、sourceId=1、ts=2016-10-15 00:02:16、ユーザー=14、値=960
  • id=25398320、sourceId=1、ts=2016-10-15 00:02:26、ユーザー=14、値=1000

Windows ベースのユーザー ID を作成するために、次のコードを実行しています。

    stream.flatMap(new LogsParser())
            .assignTimestampsAndWatermarks(new MessageTimestampExtractor())
            .keyBy("sourceId")
            .window(GlobalWindows.create())
            .trigger(PurgingTrigger.of(new MySessionTrigger()))
            .apply(new SessionWindowFunction())
            .print();

MySession トリガーは、受信したイベントを調べ、ユーザー ID をチェックして、ユーザー ID の変更時にウィンドウをトリガーします。SessionWindowFunction は、ウィンドウからセッションを作成するだけです。

作成されたセッションは次のとおりです。

  1. セッション:

    • id=25398102、sourceId=1、ts=2016-10-15 00:00:56、ユーザー=14、値=919
    • id=25398185、sourceId=1、ts=2016-10-15 00:01:06、ユーザー=14、値=920
    • id=25398210、sourceId=1、ts=2016-10-15 00:01:16、ユーザー=14、値=944
    • id=25398235、sourceId=1、ts=2016-10-15 00:01:24、ユーザー=3149、値=944
  2. セッション:

    • id=25398236、sourceId=1、ts=2016-10-15 00:01:25、ユーザー=71、値=955
    • id=25398239、sourceId=1、ts=2016-10-15 00:01:26、ユーザー=71、値=955
    • id=25398265、sourceId=1、ts=2016-10-15 00:01:36、ユーザー=71、値=955
    • id=25398310、sourceId=1、ts=2016-10-15 00:02:16、ユーザー=14、値=960
  3. セッション:

    • id=25398320、sourceId=1、ts=2016-10-15 00:02:26、ユーザー=14、値=1000

ご覧のとおり、問題は、すべてのセッションで最後のイベントが実際には次のウィンドウに属していることです。最後のイベントがすでにウィンドウ内にあるため、ウィンドウをトリガーする決定は何らかの理由で遅れています。

そのウィンドウの最後のイベントを考慮せずにウィンドウをトリガーするにはどうすればよいですか?

4

1 に答える 1

0

1 つのアイデアは、フラットマップを使用して、ユーザー ID が変更されるたびにマーカーをストリームに挿入することです。次に、これらのマーカーのいずれかが検出されるたびにトリガー関数を起動し、セッション ウィンドウ関数でマーカーを除外できます。

于 2016-11-11T15:39:36.237 に答える