5

selectステートメントがバッチごとに印刷されるのはなぜhello worldですか?

import org.apache.spark.sql.types._
val schema = StructType(
  StructField("id", LongType, nullable = false) ::
  StructField("name", StringType, nullable = false) ::
  StructField("score", DoubleType, nullable = false) :: Nil)

val in: DataFrame = sparkSession.readStream
 .schema(schema)
 .format("csv")
 .option("header", false)
 .option("maxFilesPerTrigger", 1)
 .option("delimiter", ";")
 .load("s3://xxxxxxxx")

val input: DataFrame = in.select("*")
 .transform { ds =>
   println("hello world")  // <-- Why is this printed out once?
   ds
}

import org.apache.spark.sql.streaming.StreamingQuery
val query: StreamingQuery = input.writeStream
  .format("console")
  .start
4

1 に答える 1

8

ここにSpark 2.1.0-SNAPSHOT (今日ビルド) がありますが、2.0 と現在の間で変更されていないと思います。

$ ./bin/spark-submit --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0-SNAPSHOT
      /_/

Branch master
Compiled by user jacek on 2016-09-30T07:08:39Z
Revision 1fad5596885aab8b32d2307c0edecbae50d5bd7a
Url https://github.com/apache/spark.git
Type --help for more information.

Spark のStructured Streamingでは、ストリーミング アプリケーションは、同じ物理クエリ プランを入力データ ソースに適用する単なるトリックです。

物理的なクエリ プランがあなたを作るものであることに注意してくださいDataset(そして、Spark SQL を使用すればするほど、クエリとデータセットの間に違いは見られなくなります。最近では、それらは単純に交換可能です)。

構造化されたクエリを記述する場合 (それが 1 回限りのクエリであるかストリーミング クエリであるかに関係なく)、解析、分析、最適化、最終的に物理的な計画の作成という 4 つの段階を経ます。explain(extended = true)メソッドを使用して確認できます。

scala> input.explain(extended = true)
== Parsed Logical Plan ==
StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5c4f07c1,json,List(),Some(StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(score,DoubleType,false))),List(),None,Map(path -> input-json)), FileSource[input-json], [id#15L, name#16, score#17]

== Analyzed Logical Plan ==
id: bigint, name: string, score: double
StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5c4f07c1,json,List(),Some(StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(score,DoubleType,false))),List(),None,Map(path -> input-json)), FileSource[input-json], [id#15L, name#16, score#17]

== Optimized Logical Plan ==
StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5c4f07c1,json,List(),Some(StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(score,DoubleType,false))),List(),None,Map(path -> input-json)), FileSource[input-json], [id#15L, name#16, score#17]

== Physical Plan ==
StreamingRelation FileSource[input-json], [id#15L, name#16, score#17]

ステージは遅延しており、一度だけ実行されます。

物理的な計画を取得すると、ステージは再度実行されません。パイプDatasetラインは既に計算されており、不足しているのはパイプを流れるデータだけです。

そのため、ストリーミング クエリ プランが物理プランを生成するために「実行」されたときに、「hello world」が 1 回だけ表示されます。これは一度実行され、ソースを処理するために最適化されましたDataset(そのDatasetため、副作用はすでにトリガーされていました)。

興味深いケースです。それをここに持ち出すのは大変です!

于 2016-09-13T08:27:33.700 に答える