ストリームの使用法を表示する Flink の例を拡張しようとしました。私の目標は、ウィンドウ機能を使用することです (window
関数呼び出しを参照)。以下のコードは、ストリームの最後の 3 つの数値の合計を出力すると仮定します。(ubuntuのおかげでストリームが開かれますnc -lk 9999
)実際、出力は入力されたすべての数値を合計します。タイム ウィンドウに切り替えても、同じ結果が生成されます。つまり、ウィンドウ処理は行われません。
それはバグですか?(使用バージョン: github の最新マスター)
object SocketTextStreamWordCount {
def main(args: Array[String]) {
val hostName = args(0)
val port = args(1).toInt
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Create streams for names and ages by mapping the inputs to the corresponding objects
val text = env.socketTextStream(hostName, port)
val currentMap = text.flatMap { (x:String) => x.toLowerCase.split("\\W+") }
.filter { (x:String) => x.nonEmpty }
.window(Count.of(3)).every(Time.of(1, TimeUnit.SECONDS))
// .window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(1, TimeUnit.SECONDS))
.map { (x:String) => ("not used; just to have a tuple for the sum", x.toInt) }
val numberOfItems = currentMap.count
numberOfItems print
val counts = currentMap.sum( 1 )
counts print
env.execute("Scala SocketTextStreamWordCount Example")
}
}