ここにSpark 2.1.0-SNAPSHOT (今日ビルド) がありますが、2.0 と現在の間で変更されていないと思います。
$ ./bin/spark-submit --version
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.0-SNAPSHOT
/_/
Branch master
Compiled by user jacek on 2016-09-30T07:08:39Z
Revision 1fad5596885aab8b32d2307c0edecbae50d5bd7a
Url https://github.com/apache/spark.git
Type --help for more information.
Spark のStructured Streamingでは、ストリーミング アプリケーションは、同じ物理クエリ プランを入力データ ソースに適用する単なるトリックです。
物理的なクエリ プランがあなたを作るものであることに注意してくださいDataset
(そして、Spark SQL を使用すればするほど、クエリとデータセットの間に違いは見られなくなります。最近では、それらは単純に交換可能です)。
構造化されたクエリを記述する場合 (それが 1 回限りのクエリであるかストリーミング クエリであるかに関係なく)、解析、分析、最適化、最終的に物理的な計画の作成という 4 つの段階を経ます。explain(extended = true)
メソッドを使用して確認できます。
scala> input.explain(extended = true)
== Parsed Logical Plan ==
StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5c4f07c1,json,List(),Some(StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(score,DoubleType,false))),List(),None,Map(path -> input-json)), FileSource[input-json], [id#15L, name#16, score#17]
== Analyzed Logical Plan ==
id: bigint, name: string, score: double
StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5c4f07c1,json,List(),Some(StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(score,DoubleType,false))),List(),None,Map(path -> input-json)), FileSource[input-json], [id#15L, name#16, score#17]
== Optimized Logical Plan ==
StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5c4f07c1,json,List(),Some(StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(score,DoubleType,false))),List(),None,Map(path -> input-json)), FileSource[input-json], [id#15L, name#16, score#17]
== Physical Plan ==
StreamingRelation FileSource[input-json], [id#15L, name#16, score#17]
ステージは遅延しており、一度だけ実行されます。
物理的な計画を取得すると、ステージは再度実行されません。パイプDataset
ラインは既に計算されており、不足しているのはパイプを流れるデータだけです。
そのため、ストリーミング クエリ プランが物理プランを生成するために「実行」されたときに、「hello world」が 1 回だけ表示されます。これは一度実行され、ソースを処理するために最適化されましたDataset
(そのDataset
ため、副作用はすでにトリガーされていました)。
興味深いケースです。それをここに持ち出すのは大変です!