0

他のユーザーと共有しているクラスターで Spark を使用しています。したがって、実行時間だけに基づいて、どのコードがより効率的に実行されるかを判断することは信頼できません。私がより効率的なコードを実行しているとき、他の誰かが巨大なデータを実行している可能性があり、私のコードを長時間実行させているからです。

では、ここで 2 つの質問をしてもよろしいでしょうか。

  1. join関数を使用して 2 に参加していましたが、次のように を使用する前に使用RDDsしようとしています:groupByKey()join

    rdd1.groupByKey().join(rdd2)
    

    もっと時間がかかったように見えますが、Hadoop Hive を使用していたとき、グループによってクエリの実行が高速化されたことを覚えています。Spark は遅延評価を使用しているため、groupByKey以前より高速になるかどうか疑問に思ってjoinいます

  2. Spark に SQL モジュールがあることに気付きました。今のところ実際に試す時間はありませんが、SQL モジュールと RDD SQL のような関数の違いは何ですか?

4

2 に答える 2

5
  1. groupByKey続いて単独joinよりも速くなる正当な理由はありませんjoinrdd1と とrdd2が 1 つまたは複数のパーティショナーを持っていない場合、制限要因は単純に に必要なシャッフルですHashPartitioning

    を使用groupByKeyすると、グループ化に必要な変更可能なバッファーを保持することで総コストが増加するだけでなく、さらに重要なことは、より複雑な DAG をもたらす追加の変換を使用することです。groupByKey+ join:

    rdd1 = sc.parallelize([("a", 1), ("a", 3), ("b", 2)])
    rdd2 = sc.parallelize([("a", 5), ("c", 6), ("b", 7)])
    rdd1.groupByKey().join(rdd2)
    

    ここに画像の説明を入力

    join単独:

    rdd1.join(rdd2)
    

    ここに画像の説明を入力

    flatMap最後に、これら 2 つのプランは等価ではなく、同じ結果を得るには、最初のプランに追加する必要があります。

  2. これは非常に幅広い質問ですが、主な違いを強調するには:

    • PairwiseRDDsTuple2任意の要素の同種のコレクションです。デフォルトの操作では、キーを意味のある方法でハッシュ可能にする必要があります。それ以外の場合、タイプに関する厳密な要件はありません。対照的に、DataFrame はより動的な型付けを示しますが、各列には、サポートされている定義済みの型のセットからの値のみを含めることができます。UDTを定義することは可能ですが、それでも基本的なものを使用して表現する必要があります。

    • DataFrame は、論理的および物理的な実行計画を生成するCatalyst Optimizerを使用し、手動で低レベルの最適化を適用する必要なく、高度に最適化されたクエリを生成できます。RDD ベースの操作は、単に依存関係 DAG に従います。これは、カスタムの最適化を行わないとパフォーマンスが低下することを意味しますが、実行の制御が大幅に向上し、段階的な微調整が可能になる可能性があります。

その他の注意事項:

于 2015-10-26T11:06:14.243 に答える
4

私はzero323の答えにほぼ同意しますjoingroupByKey. groupByKeyデータの量を減らし、データをキーで分割します。これらは両方とも、後続の のパフォーマンスに役立ちますjoin

前者(データサイズの縮小)は重要ではないと思います。後者 (パーティショニング) の利点を享受するには、他の RDD を同じ方法でパーティショニングする必要があります。

例えば:

val a = sc.parallelize((1 to 10).map(_ -> 100)).groupByKey()
val b = sc.parallelize((1 to 10).map(_ -> 100)).partitionBy(a.partitioner.get)
a.join(b).collect

DAG ビジュアライゼーション

于 2015-10-26T12:54:48.200 に答える