313

RDDApache Spark のとDataFrame (Spark 2.0.0 DataFrame は の単なるタイプ エイリアスですDataset[Row])の違いは何ですか?

一方を他方に変換できますか?

4

14 に答える 14

268

ADataFrameは、「DataFrame 定義」の Google 検索で適切に定義されています。

データ フレームはテーブル、または 2 次元配列のような構造であり、各列には 1 つの変数の測定値が含まれ、各行には 1 つのケースが含まれます。

そのDataFrameため、表形式であるため、追加のメタデータがあり、Spark は最終的なクエリで特定の最適化を実行できます。

RDD一方、 は、それに対して実行できる操作がそれほど制約されていないため、最適化できないデータのブラックボックスにすぎない、回復ある分散型データセットにすぎません。

RDDただし、メソッドを介してDataFrame から に移動したり、メソッドを介して(RDD が表形式の場合) から (RDD が表形式の場合) にrdd移動したりできます。RDDDataFrametoDF

一般DataFrameに、クエリの最適化が組み込まれているため、可能な場合は を使用することをお勧めします。

于 2015-07-20T03:09:05.453 に答える
263

まずDataFrame、 から進化しましたSchemaRDD

非推奨のメソッド toSchemaRDD

はい.. と の間の変換DataframeRDD絶対に可能です。

以下に、サンプル コードの一部を示します。

  • df.rddRDD[Row]

以下は、データフレームを作成するためのいくつかのオプションです。

  • 1)yourrddOffrow.toDFに変換しDataFrameます。

  • 2) createDataFrameSQL コンテキストの使用

    val df = spark.createDataFrame(rddOfRow, schema)

スキーマは、nice SO post で説明されているように、以下のオプションのいくつかから取得できます
。scala ケース クラスと scala リフレクション API から

import org.apache.spark.sql.catalyst.ScalaReflection
val schema = ScalaReflection.schemaFor[YourScalacaseClass].dataType.asInstanceOf[StructType]

または使用Encoders

import org.apache.spark.sql.Encoders
val mySchema = Encoders.product[MyCaseClass].schema

スキーマで説明されているように、 と を使用して作成することもできStructTypeます StructField

val schema = new StructType()
  .add(StructField("id", StringType, true))
  .add(StructField("col1", DoubleType, true))
  .add(StructField("col2", DoubleType, true)) etc...

画像説明

実際、現在 3 つの Apache Spark API があります。

ここに画像の説明を入力

  1. RDDAPI :

( RDDResilient Distributed Dataset) API は、1.0 リリース以降、Spark に含まれています。

API には、データの計算を実行するための ( )、 () 、()RDDなどの多くの変換メソッドが用意されています。これらの各メソッドは、変換されたデータを表す new を生成します。ただし、これらのメソッドは実行する操作を定義しているだけであり、アクション メソッドが呼び出されるまで変換は実行されません。アクションメソッドの例は() と () です。mapfilterreduceRDDcollectsaveAsObjectFile

RDD の例:

rdd.filter(_.age > 21) // transformation
   .map(_.last)// transformation
.saveAsObjectFile("under21.bin") // action

例: RDD を使用した属性によるフィルター

rdd.filter(_.age > 21)
  1. DataFrameAPI

DataFrameSpark 1.3 では、Sparkのパフォーマンスとスケーラビリティの向上を目指す Project Tungsten イニシアチブの一環として、新しい API が導入されました。API は、データを記述するスキーマのDataFrame概念を導入し、Spark がスキーマを管理し、Java シリアル化を使用するよりもはるかに効率的な方法でノード間でデータのみを渡すことを可能にします。

API は、Spark の Catalyst オプティマイザーが実行できるリレーショナル クエリ プランを構築するための API であるため、APIとDataFrameは根本的に異なります。RDDAPI は、クエリ プランの構築に精通している開発者にとって自然なものです。

SQL スタイルの例:

df.filter("age > 21");

制限 : コードはデータ属性を名前で参照しているため、コンパイラがエラーをキャッチすることはできません。属性名が正しくない場合、クエリ プランが作成される実行時にのみエラーが検出されます。

APIのもう 1 つの欠点は、DataFrame非常にスカラ中心であり、Java をサポートしているものの、サポートが限られていることです。

たとえばDataFrame、既存RDDの Java オブジェクトから を作成する場合、Spark の Catalyst オプティマイザーはスキーマを推測できず、DataFrame 内のすべてのオブジェクトがscala.Productインターフェイスを実装していると想定します。Scalacase classはこのインターフェースを実装しているため、すぐに使用できます。

  1. DatasetAPI

DatasetSpark 1.6 で API プレビューとしてリリースされた API は、両方の長所を提供することを目的としています。使い慣れたオブジェクト指向プログラミング スタイルとRDDAPI のコンパイル時の型安全性に加えて、Catalyst クエリ オプティマイザーのパフォーマンス上の利点も備えています。データセットは、API と同じ効率的なオフヒープ ストレージ メカニズムも使用し DataFrameます。

データのシリアル化に関しては、 API には、 JVM 表現 (オブジェクト) と Spark の内部バイナリ形式の間で変換するエンコーダDatasetの概念があり ます。Spark には組み込みのエンコーダーがあり、バイトコードを生成してオフヒープ データとやり取りし、オブジェクト全体を逆シリアル化することなく個々の属性へのオンデマンド アクセスを提供するという点で非常に高度です。Spark は、カスタム エンコーダーを実装するための API をまだ提供していませんが、将来のリリースで計画されています。

さらに、DatasetAPI は Java と Scala の両方で同様に機能するように設計されています。Java オブジェクトを操作する場合、それらが完全に Bean に準拠していることが重要です。

DatasetAPI SQL スタイルの例:

dataset.filter(_.age < 21);

評価差。間DataFrame& DataSet: ここに画像の説明を入力

カタリストレベルの流れ。.(Spark Summit からの DataFrame と Dataset のプレゼンテーションのわかりやすい説明) ここに画像の説明を入力

さらに読む... databricks記事 - 3 つの Apache Spark API の物語: RDD と DataFrames および Datasets

于 2016-08-19T07:23:53.477 に答える
153

Apache Spark は 3 種類の API を提供します

  1. RDD
  2. データフレーム
  3. データセット

RDD、データフレーム、データセット API の比較

これは、RDD、データフレーム、およびデータセットの API の比較です。

RDD

Spark が提供する主な抽象化は、回復力のある分散データセット (RDD) です。RDD は、並列で操作できるクラスターのノード間で分割された要素のコレクションです。

RDD 機能:-

  • 分散コレクション:
    RDD は MapReduce 操作を使用します。これは、クラスター上で並列分散アルゴリズムを使用して大規模なデータセットを処理および生成するために広く採用されています。これにより、ユーザーは一連の高レベルの演算子を使用して、作業の分散やフォールト トレランスについて心配することなく、並列計算を作成できます。

  • 不変:分割されたレコードのコレクションで構成される RDD。パーティションは、RDD における並列処理の基本単位であり、各パーティションはデータの 1 つの論理分割であり、不変であり、既存のパーティションでいくつかの変換を介して作成されます。不変性は、計算の一貫性を実現するのに役立ちます。

  • フォールト トレラント: RDD の一部のパーティションが失われた場合、複数のノード間でデータのレプリケーションを行うのではなく、同じ計算を行うために、そのパーティションの変換を系譜で再生できます。データ管理と複製に多くの労力を費やし、より高速な計算を実現します。

  • 遅延評価: Spark のすべての変換は、結果をすぐに計算しないという点で遅延です。代わりに、一部の基本データセットに適用された変換を覚えているだけです。変換は、アクションがドライバー プログラムに結果を返す必要がある場合にのみ計算されます。

  • 機能変換: RDD は、既存のデータセットから新しいデータセットを作成する変換と、データセットで計算を実行した後にドライバー プログラムに値を返すアクションの 2 種類の操作をサポートします。

  • データ処理フォーマット:
    構造化データと非構造化データを簡単かつ効率的に処理できます。

  • サポートされるプログラミング言語:
    RDD API は、Java、Scala、Python、および R で使用できます。

RDD の制限:-

  • 組み込みの最適化エンジンなし: 構造化データを扱う場合、RDD は、Catalyst オプティマイザーや Tungsten 実行エンジンなどの Spark の高度なオプティマイザーを利用できません。開発者は、その属性に基づいて各 RDD を最適化する必要があります。

  • 構造化データの処理: Dataframe やデータセットとは異なり、RDD は取り込まれたデータのスキーマを推測せず、ユーザーがそれを指定する必要があります。

データフレーム

Spark は、Spark 1.3 リリースでデータフレームを導入しました。Dataframe は、RDD が抱えていた主要な課題を克服します。

DataFrame は、名前付きの列に編成されたデータの分散コレクションです。これは、リレーショナル データベースまたは R/Python データフレームのテーブルと概念的に同等です。Dataframe とともに、Spark は、高度なプログラミング機能を活用して拡張可能なクエリ オプティマイザーを構築する Catalyst オプティマイザーも導入しました。

データフレーム機能:-

  • 行オブジェクトの分散コレクション: DataFrame は、名前付き列に編成されたデータの分散コレクションです。概念的にはリレーショナル データベースのテーブルと同等ですが、内部ではより高度な最適化が行われています。

  • データ処理: 構造化および非構造化データ形式 (Avro、CSV、エラスティック サーチ、Cassandra) およびストレージ システム (HDFS、HIVE テーブル、MySQL など) の処理。これらのさまざまなデータソースすべてから読み書きできます。

  • Catalystオプティマイザーを使用した最適化: SQL クエリと DataFrame API の両方を強化します。データフレームは、4 つのフェーズで触媒ツリー変換フレームワークを使用します。

     1.Analyzing a logical plan to resolve references
     2.Logical plan optimization
     3.Physical planning
     4.Code generation to compile parts of the query to Java bytecode.
    
  • Hive の互換性: Spark SQL を使用すると、既存の Hive ウェアハウスで変更されていない Hive クエリを実行できます。Hive フロントエンドと MetaStore を再利用し、既存の Hive データ、クエリ、および UDF との完全な互換性を提供します。

  • Tungsten: Tungsten は、メモリーを明示的に管理し、式評価用のバイトコードを動的に生成する物理実行バックエンドを提供します。

  • サポートされるプログラミング言語:
    Dataframe API は、Java、Scala、Python、および R で使用できます。

データフレームの制限:-

  • コンパイル時の型の安全性: 説明したように、Dataframe API はコンパイル時の安全性をサポートしていないため、構造が不明な場合にデータを操作することが制限されます。次の例は、コンパイル時に機能します。ただし、このコードを実行するとランタイム例外が発生します。

例:

case class Person(name : String , age : Int) 
val dataframe = sqlContext.read.json("people.json") 
dataframe.filter("salary > 10000").show 
=> throws Exception : cannot resolve 'salary' given input age , name

これは、いくつかの変換および集約ステップを使用している場合に特に困難です。

  • ドメイン オブジェクトを操作できません (失われたドメイン オブジェクト): ドメイン オブジェクトをデータフレームに変換すると、そこから再生成することはできません。次の例では、personRDD から personDF を作成すると、元の Person クラスの RDD (RDD[Person]) は復元されません。

例:

case class Person(name : String , age : Int)
val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20)))
val personDF = sqlContext.createDataframe(personRDD)
personDF.rdd // returns RDD[Row] , does not returns RDD[Person]

データセット API

データセット API は、タイプ セーフなオブジェクト指向プログラミング インターフェイスを提供する DataFrames の拡張機能です。これは、リレーショナル スキーマにマップされるオブジェクトの厳密に型指定された不変のコレクションです。

Dataset の中核にある API は、エンコーダーと呼ばれる新しい概念であり、JVM オブジェクトと表形式の間の変換を担当します。表形式の表現は、Spark の内部 Tungsten バイナリ形式を使用して保存されるため、シリアル化されたデータの操作とメモリ使用率の向上が可能になります。Spark 1.6 には、プリミティブ型 (String、Integer、Long など)、Scala ケース クラス、および Java Beans を含む、さまざまな型のエンコーダーの自動生成がサポートされています。

データセットの特徴:-

  • RDD と Dataframe の両方の長所を提供: RDD (関数型プログラミング、タイプ セーフ)、DataFrame (リレーショナル モデル、クエリの最適化、Tungsten 実行、並べ替えとシャッフル)

  • エンコーダー: エンコーダーを使用すると、任意の JVM オブジェクトをデータセットに簡単に変換できるため、ユーザーはデータフレームとは異なり、構造化データと非構造化データの両方を操作できます。

  • サポートされているプログラミング言語: Datasets API は現在、Scala と Java でのみ使用できます。Python と R は現在、バージョン 1.6 ではサポートされていません。Python のサポートは、バージョン 2.0 で予定されています。

  • 型の安全性: データセット API は、データフレームでは利用できなかったコンパイル時の安全性を提供します。以下の例では、Dataset がコンパイル ラムダ関数を使用してドメイン オブジェクトを操作する方法を確認できます。

例:

case class Person(name : String , age : Int)
val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20)))
val personDF = sqlContext.createDataframe(personRDD)
val ds:Dataset[Person] = personDF.as[Person]
ds.filter(p => p.age > 25)
ds.filter(p => p.salary > 25)
 // error : value salary is not a member of person
ds.rdd // returns RDD[Person]
  • 相互運用性:データセットを使用すると、定型コードなしで既存の RDD とデータフレームをデータセットに簡単に変換できます。

データセット API の制限:-

  • 文字列への型キャストが必要: 現在、データセットからデータをクエリするには、クラス内のフィールドを文字列として指定する必要があります。データをクエリしたら、必要なデータ型に列をキャストする必要があります。一方、データセットでマップ操作を使用する場合、Catalyst オプティマイザーは使用されません。

例:

ds.select(col("name").as[String], $"age".as[Int]).collect()

Python と R のサポートなし: リリース 1.6 の時点で、データセットは Scala と Java のみをサポートします。Python のサポートは、Spark 2.0 で導入されます。

Datasets API は、既存の RDD および Dataframe API より優れた型安全性と関数型プログラミングを備えたいくつかの利点をもたらします。

于 2016-09-22T05:50:54.423 に答える
6

A Dataframe is an RDD of Row objects, each representing a record. A Dataframe also knows the schema (i.e., data fields) of its rows. While Dataframes look like regular RDDs, internally they store data in a more efficient manner, taking advantage of their schema. In addition, they provide new operations not available on RDDs, such as the ability to run SQL queries. Dataframes can be created from external data sources, from the results of queries, or from regular RDDs.

Reference: Zaharia M., et al. Learning Spark (O'Reilly, 2015)

于 2016-10-20T13:27:26.463 に答える