最初にデータセット API を使用して静的データを操作し、次に DataStream API を使用してストリーミング ジョブを実行したいと考えています。IDE でコードを書くと、完全に機能します。しかし、ローカルの flink jobmanager (すべての並列処理 1) で実行しようとすると、ストリーミング コードが実行されません!
たとえば、次のコードは機能しません。
val parallelism = 1
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(parallelism)
val envStatic = ExecutionEnvironment.getExecutionEnvironment
envStatic.setParallelism(parallelism)
val myStaticData = envStatic.fromCollection(1 to 10)
val myVal: Int = myStaticData.reduce(_ + _).collect().head
val theStream = env.fromElements(1).iterate( iteretion => {
val result = iteretion.map(x => x + myVal)
(result, result)
})
theStream.print()
env.execute("static and streaming together")
このことを機能させるにはどうすればよいですか?
ログ:上記プログラムの実行ログ
実行計画: plan 非周期的なようです。