10

I'm running into some troubles understanding the semantics around event time windowing. The following program generates some tuples with timestamps that are used as event time and does a simple window aggregation. I would expect the output to be in the same order as the input, but the output is ordered differently. Why is the output out of order with respect to event time?

import java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._

object WindowExample extends App {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.getConfig.enableTimestamps()
    env.setParallelism(1)

    val start = 1449597577379L
    val tuples = (1 to 10).map(t => (start + t * 1000, t))

    env.fromCollection(tuples)
      .assignAscendingTimestamps(_._1)
      .timeWindowAll(Time.of(1, TimeUnit.SECONDS))
      .sum(1)
      .print()

    env.execute()
}

The input:

 (1449597578379,1)
 (1449597579379,2)
 (1449597580379,3)
 (1449597581379,4)
 (1449597582379,5)
 (1449597583379,6)
 (1449597584379,7)
 (1449597585379,8)
 (1449597586379,9)
 (1449597587379,10)

Result:

[info] (1449597579379,2)
[info] (1449597581379,4)
[info] (1449597583379,6)
[info] (1449597585379,8)
[info] (1449597587379,10)
[info] (1449597578379,1)
[info] (1449597580379,3)
[info] (1449597582379,5)
[info] (1449597584379,7)
[info] (1449597586379,9)
4

1 に答える 1

12

この動作の理由は、Flink では (タイムスタンプに関して) 要素の順序が考慮されていないためです。透かしは通常、時間ベースの操作で計算をトリガーするため、時間を考慮する操作では、透かしの正確さと要素のタイムスタンプとの関係のみが重要です。

あなたの例では、ウィンドウ オペレーターは、ソースからのすべての要素を内部ウィンドウ バッファーに格納します。次に、ソースは、タイムスタンプが小さい要素が今後到着しないことを示す透かしを発行します。これにより、ウィンドウ オペレータは終了タイムスタンプがウォーターマークを下回るすべてのウィンドウを処理するようになります (これはすべてのウィンドウに当てはまります)。したがって、すべてのウィンドウを (任意の順序で) 発行し、その後、透かし自体を発行します。これより下流の操作は、それ自体が要素を受け取り、透かしを受け取ったら処理を行うことができます。

デフォルトでは、ソースから透かしが放出される間隔は 200 ミリ秒です。ソースが放出する少量の要素では、最初の透かしが放出される前にそれらすべてが放出されます。ウォーターマークの発行間隔がウィンドウ サイズよりもはるかに小さい実際の使用例では、タイムスタンプの順序でウィンドウが発行されると予想される動作が得られます。たとえば、1 時間のウィンドウと 500 ミリ秒ごとのウォーターマークがあるとします。

于 2015-12-09T14:31:12.337 に答える