2

最初にデータセット API を使用して静的データを操作し、次に DataStream API を使用してストリーミング ジョブを実行したいと考えています。IDE でコードを書くと、完全に機能します。しかし、ローカルの flink jobmanager (すべての並列処理 1) で実行しようとすると、ストリーミング コードが実行されません!

たとえば、次のコードは機能しません。

val parallelism = 1

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(parallelism)

val envStatic = ExecutionEnvironment.getExecutionEnvironment
envStatic.setParallelism(parallelism)

val myStaticData = envStatic.fromCollection(1 to 10)
val myVal: Int = myStaticData.reduce(_ + _).collect().head

val theStream = env.fromElements(1).iterate( iteretion => {
  val result = iteretion.map(x => x + myVal)
  (result, result)
})
theStream.print()
env.execute("static and streaming together")

このことを機能させるにはどうすればよいですか?

ログ:上記プログラムの実行ログ

実行計画: plan 非周期的なようです。

4

1 に答える 1

4

countcollectまたはによってトリガーされるなど、複数のサブジョブで構成される Flink ジョブがある場合print、Web インターフェース経由でジョブを送信することはできません。Web インターフェースは単一の Flink ジョブのみをサポートします。

于 2016-04-12T14:44:04.540 に答える