action
私の理解では、Sparkにはそれぞれに 1 つのジョブがあります。
しかし、1 つのアクションに対して複数のジョブがトリガーされることがよくあります。データセットで単純な集計を行って各カテゴリから最大値を取得することでこれをテストしようとしていました(ここでは「件名」フィールド)
Spark UI を調べていると、操作のために 3 つの「ジョブ」が実行されていることがわかりますが、groupBy
1 つだけを期待していました。
1つではなく3つある理由を理解するのを手伝ってくれる人はいますか?
students.show(5)
+----------+--------------+----------+----+-------+-----+-----+
|student_id|exam_center_id| subject|year|quarter|score|grade|
+----------+--------------+----------+----+-------+-----+-----+
| 1| 1| Math|2005| 1| 41| D|
| 1| 1| Spanish|2005| 1| 51| C|
| 1| 1| German|2005| 1| 39| D|
| 1| 1| Physics|2005| 1| 35| D|
| 1| 1| Biology|2005| 1| 53| C|
| 1| 1|Philosophy|2005| 1| 73| B|
// Task : Find Highest Score in each subject
val highestScores = students.groupBy("subject").max("score")
highestScores.show(10)
+----------+----------+
| subject|max(score)|
+----------+----------+
| Spanish| 98|
|Modern Art| 98|
| French| 98|
| Physics| 98|
| Geography| 98|
| History| 98|
| English| 98|
| Classics| 98|
| Math| 98|
|Philosophy| 98|
+----------+----------+
only showing top 10 rows
Spark UI を調べていると、操作のために 3 つの「ジョブ」が実行されていることがわかりますが、groupBy
1 つだけを期待していました。
1つではなく3つある理由を理解するのを手伝ってくれる人はいますか?
== Physical Plan ==
*(2) HashAggregate(keys=[subject#12], functions=[max(score#15)])
+- Exchange hashpartitioning(subject#12, 1)
+- *(1) HashAggregate(keys=[subject#12], functions=[partial_max(score#15)])
+- *(1) FileScan csv [subject#12,score#15] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/C:/lab/SparkLab/files/exams/students.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<subject:string,score:int>