32

spark 2.0.1 から始めて、いくつか質問がありました。多くのドキュメントを読みましたが、これまでのところ十分な答えが見つかりませんでした:

  • 違いは何ですか
    • df.select("foo")
    • df.select($"foo")
  • 私はそれを正しく理解していますか
    • myDataSet.map(foo.someVal)タイプセーフであり、変換されませんRDDが、DataSet 表現にとどまります / 追加のオーバーヘッドはありません (2.0.0 のパフォーマンスに関して)
  • select などの他のすべてのコマンドは、単なるシンタックス シュガーです。タイプセーフではなく、代わりにマップを使用できます。df.select("foo")map ステートメントなしでタイプセーフにする にはどうすればよいですか?
    • マップの代わりに UDF / UADF を使用する必要があるのはなぜですか (マップがデータセット表現にとどまると仮定して)?
4

3 に答える 3

37
  1. df.select("foo") との違いdf.select($"foo")は署名です。前者は少なくとも 1 つString、後者は 0 個以上かかりますColumns。それ以上の実用的な違いはありません。
  2. myDataSet.map(foo.someVal)型はチェックしますが、オブジェクトをDataset使用する操作と同様に、操作に比べてかなりのオーバーヘッドがあります。簡単な例を見てみましょう。RDDDataFrame

    case class FooBar(foo: Int, bar: String)
    val ds = Seq(FooBar(1, "x")).toDS
    ds.map(_.foo).explain
    
    == Physical Plan ==
    *SerializeFromObject [input[0, int, true] AS value#123]
    +- *MapElements <function1>, obj#122: int
       +- *DeserializeToObject newInstance(class $line67.$read$$iw$$iw$FooBar), obj#121: $line67.$read$$iw$$iw$FooBar
          +- LocalTableScan [foo#117, bar#118]
    

    ご覧のとおり、この実行計画にはすべてのフィールドへのアクセスが必要であり、DeserializeToObject.

  3. いいえ。一般に、他のメソッドはシンタックス シュガーではなく、大幅に異なる実行計画を生成します。例えば:

    ds.select($"foo").explain
    
    == Physical Plan ==
    LocalTableScan [foo#117]
    

    列に直接アクセスできるようになる前に示した計画と比較して。これは、API の制限ではなく、操作上のセマンティクスの違いによるものです。

  4. map ステートメントなしで df.select("foo") タイプセーフにするにはどうすればよいですか?

    そのようなオプションはありません。型指定された列を使用すると、Dataset別の静的に型指定された に静的に変換できDatasetます。

    ds.select($"bar".as[Int])
    

    タイプセーフはありません。型指定された集計など、型安全に最適化された操作を含める試みは他にもいくつかありますが、この実験的な API は.

  5. マップの代わりに UDF / UADF を使用する理由

    それは完全にあなた次第です。Spark の各分散データ構造には、独自の長所と短所があります (たとえば、Spark UDAF with ArrayType as bufferSchema のパフォーマンスの問題を参照してください)。

個人的には、静的型付けDatasetが最も役に立たないと思います。

  • . _ Dataset[Row]_DataFrame

  • 型指定された変換はブラック ボックスであり、オプティマイザーの分析バリアを効果的に作成します。たとえば、選択 (フィルター) を型指定された変換にプッシュすることはできません。

    ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].filter(x => true).where($"foo" === 1).explain
    
    == Physical Plan ==
    *Filter (foo#133 = 1)
    +- *Filter <function1>.apply
       +- *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))])
          +- Exchange hashpartitioning(foo#133, 200)
             +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))])
                +- LocalTableScan [foo#133, bar#134]
    

    に比べ:

    ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].where($"foo" === 1).explain
    
    == Physical Plan ==
    *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))])
    +- Exchange hashpartitioning(foo#133, 200)
       +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))])
          +- *Filter (foo#133 = 1)
             +- LocalTableScan [foo#133, bar#134] 
    

    これは、述語のプッシュダウンやプロジェクションのプッシュダウンなどの機能に影響します。

  • RDDsネイティブでサポートされているタイプの小さなサブセットのみを使用するほど柔軟ではありません。

  • メソッドを使用して変換された場合、「型の安全性」Encodersは議論の余地があります。データ形状は署名を使用してエンコードされていないため、コンパイラは.DatasetasEncoder

関連する質問:

于 2016-11-15T05:48:17.863 に答える