5

action私の理解では、Sparkにはそれぞれに 1 つのジョブがあります。
しかし、1 つのアクションに対して複数のジョブがトリガーされることがよくあります。データセットで単純な集計を行って各カテゴリから最大値を取得することでこれをテストしようとしていました(ここでは「件名」フィールド)

Spark UI を調べていると、操作のために 3 つの「ジョブ」が実行されていることがわかりますが、groupBy1 つだけを期待していました。
1つではなく3つある理由を理解するのを手伝ってくれる人はいますか?

   students.show(5)

    +----------+--------------+----------+----+-------+-----+-----+
    |student_id|exam_center_id|   subject|year|quarter|score|grade|
    +----------+--------------+----------+----+-------+-----+-----+
    |         1|             1|      Math|2005|      1|   41|    D|
    |         1|             1|   Spanish|2005|      1|   51|    C|
    |         1|             1|    German|2005|      1|   39|    D|
    |         1|             1|   Physics|2005|      1|   35|    D|
    |         1|             1|   Biology|2005|      1|   53|    C|
    |         1|             1|Philosophy|2005|      1|   73|    B|
    

  // Task : Find Highest Score in each subject
  val highestScores = students.groupBy("subject").max("score")
  highestScores.show(10)

+----------+----------+
|   subject|max(score)|
+----------+----------+
|   Spanish|        98|
|Modern Art|        98|
|    French|        98|
|   Physics|        98|
| Geography|        98|
|   History|        98|
|   English|        98|
|  Classics|        98|
|      Math|        98|
|Philosophy|        98|
+----------+----------+
only showing top 10 rows

Spark UI を調べていると、操作のために 3 つの「ジョブ」が実行されていることがわかりますが、groupBy1 つだけを期待していました。 ここに画像の説明を入力

ここに画像の説明を入力 1つではなく3つある理由を理解するのを手伝ってくれる人はいますか?

== Physical Plan ==
*(2) HashAggregate(keys=[subject#12], functions=[max(score#15)])
+- Exchange hashpartitioning(subject#12, 1)
   +- *(1) HashAggregate(keys=[subject#12], functions=[partial_max(score#15)])
      +- *(1) FileScan csv [subject#12,score#15] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/C:/lab/SparkLab/files/exams/students.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<subject:string,score:int>
4

2 に答える 2

0

物理的な計画を確認することをお勧めします-

highestScores.explain()

次のようなものが表示される場合があります-

*(2) HashAggregate(keys=[subject#9], functions=[max(score#12)], output=[subject#9, max(score)#51])
+- Exchange hashpartitioning(subject#9, 2)
   +- *(1) HashAggregate(keys=[subject#9], functions=[partial_max(score#12)], output=[subject#9, max#61])
  1. 【マップステージ】ステージ#1はローカル集約(部分集約)を実現し、シャッフルは を使って行われhashpartitioning(subject)ました。hashpartitioner がgroup by列を使用することに注意してください
  2. [段階を減らす] 段階 2 は、段階 1 の出力をマージして最終的なものを取得することですmax(score)
  3. これは実際に上位 10 件のレコードを出力するために使用されますshow(10)
于 2020-06-28T03:49:29.753 に答える