3

ストリームの使用法を表示する Flink の例を拡張しようとしました。私の目標は、ウィンドウ機能を使用することです (window関数呼び出しを参照)。以下のコードは、ストリームの最後の 3 つの数値の合計を出力すると仮定します。(ubuntuのおかげでストリームが開かれますnc -lk 9999)実際、出力は入力されたすべての数値を合計します。タイム ウィンドウに切り替えても、同じ結果が生成されます。つまり、ウィンドウ処理は行われません。

それはバグですか?(使用バージョン: github の最新マスター)

object SocketTextStreamWordCount {
  def main(args: Array[String]) {
    val hostName = args(0)
    val port = args(1).toInt
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // Create streams for names and ages by mapping the inputs to the corresponding objects
    val text = env.socketTextStream(hostName, port)    
    val currentMap = text.flatMap { (x:String) => x.toLowerCase.split("\\W+") }
    .filter { (x:String) => x.nonEmpty }      
    .window(Count.of(3)).every(Time.of(1, TimeUnit.SECONDS))
    //  .window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(1, TimeUnit.SECONDS))
      .map { (x:String) => ("not used; just to have a tuple for the sum", x.toInt) }

    val numberOfItems = currentMap.count
    numberOfItems print
    val counts = currentMap.sum( 1 )
    counts print

    env.execute("Scala SocketTextStreamWordCount Example")
  }
}
4

1 に答える 1

5

問題は、 からWindowedDataStreamへの暗黙の変換があることのようDataStreamです。この暗黙的な変換は、 を呼び出しflatten()ますWindowedDataStream

あなたの場合、コードは次のように展開されます。

val currentMap = text.flatMap { (x:String) => x.toLowerCase.split("\\W+") }
    .filter { (x:String) => x.nonEmpty }      
    .window(Count.of(3)).every(Time.of(1, TimeUnit.SECONDS))
    .flatten()   
    .map { (x:String) => ("not used; just to have a tuple for the sum",x.toInt) }    

コレクションflatten()の に似ていることは何ですか。flatMap()コレクションのコレクション ( [[a,b,c], [d,e,f]]) と見なすことができるウィンドウのストリームを受け取り、それを要素のストリームに変換します: [a,b,c,d,e,f]

これは、カウントが実際にウィンドウ化され、「ウィンドウ化解除」された元のストリームでのみ動作することを意味します。これは、まったくウィンドウ化されていないようです。

これは問題であり、すぐに修正に取り組みます。(私は Flink コミッターの 1 人です。) ここで進行状況を追跡できます: https://issues.apache.org/jira/browse/FLINK-2096

現在の API でそれを行う方法は次のとおりです。

val currentMap = text.flatMap { (x:String) => x.toLowerCase.split("\\W+") }
    .filter { (x:String) => x.nonEmpty }   
    .map { (x:String) => ("not used; just to have a tuple for the sum",x.toInt) }    
    .window(Count.of(3)).every(Time.of(1, TimeUnit.SECONDS))

WindowedDataStreamsum() メソッドがあるため、 flatten() 呼び出しの暗黙的な挿入はありません。残念ながら、count()は利用できないため、タプルにフィールドをWindowedDataStream手動で追加してカウントする必要があります。1

于 2015-05-26T15:32:58.287 に答える