RDD
Apache Spark のとDataFrame
(Spark 2.0.0 DataFrame は の単なるタイプ エイリアスですDataset[Row]
)の違いは何ですか?
一方を他方に変換できますか?
RDD
Apache Spark のとDataFrame
(Spark 2.0.0 DataFrame は の単なるタイプ エイリアスですDataset[Row]
)の違いは何ですか?
一方を他方に変換できますか?
ADataFrame
は、「DataFrame 定義」の Google 検索で適切に定義されています。
データ フレームはテーブル、または 2 次元配列のような構造であり、各列には 1 つの変数の測定値が含まれ、各行には 1 つのケースが含まれます。
そのDataFrame
ため、表形式であるため、追加のメタデータがあり、Spark は最終的なクエリで特定の最適化を実行できます。
RDD
一方、 は、それに対して実行できる操作がそれほど制約されていないため、最適化できないデータのブラックボックスにすぎない、回復力のある分散型データセットにすぎません。
RDD
ただし、メソッドを介してDataFrame から に移動したり、メソッドを介して(RDD が表形式の場合) から (RDD が表形式の場合) にrdd
移動したりできます。RDD
DataFrame
toDF
一般DataFrame
に、クエリの最適化が組み込まれているため、可能な場合は を使用することをお勧めします。
まず
DataFrame
、 から進化しましたSchemaRDD
。
はい.. と の間の変換Dataframe
はRDD
絶対に可能です。
以下に、サンプル コードの一部を示します。
df.rdd
はRDD[Row]
以下は、データフレームを作成するためのいくつかのオプションです。
1)yourrddOffrow.toDF
に変換しDataFrame
ます。
2) createDataFrame
SQL コンテキストの使用
val df = spark.createDataFrame(rddOfRow, schema)
スキーマは、nice SO post で説明されているように、以下のオプションのいくつかから取得できます
。scala ケース クラスと scala リフレクション API からimport org.apache.spark.sql.catalyst.ScalaReflection val schema = ScalaReflection.schemaFor[YourScalacaseClass].dataType.asInstanceOf[StructType]
または使用
Encoders
import org.apache.spark.sql.Encoders val mySchema = Encoders.product[MyCaseClass].schema
スキーマで説明されているように、 と を使用して作成することもでき
StructType
ますStructField
val schema = new StructType() .add(StructField("id", StringType, true)) .add(StructField("col1", DoubleType, true)) .add(StructField("col2", DoubleType, true)) etc...
実際、現在 3 つの Apache Spark API があります。
RDD
API :(
RDD
Resilient Distributed Dataset) API は、1.0 リリース以降、Spark に含まれています。API には、データの計算を実行するための ( )、 () 、()
RDD
などの多くの変換メソッドが用意されています。これらの各メソッドは、変換されたデータを表す new を生成します。ただし、これらのメソッドは実行する操作を定義しているだけであり、アクション メソッドが呼び出されるまで変換は実行されません。アクションメソッドの例は() と () です。map
filter
reduce
RDD
collect
saveAsObjectFile
RDD の例:
rdd.filter(_.age > 21) // transformation
.map(_.last)// transformation
.saveAsObjectFile("under21.bin") // action
例: RDD を使用した属性によるフィルター
rdd.filter(_.age > 21)
DataFrame
API
DataFrame
Spark 1.3 では、Sparkのパフォーマンスとスケーラビリティの向上を目指す Project Tungsten イニシアチブの一環として、新しい API が導入されました。API は、データを記述するスキーマのDataFrame
概念を導入し、Spark がスキーマを管理し、Java シリアル化を使用するよりもはるかに効率的な方法でノード間でデータのみを渡すことを可能にします。API は、Spark の Catalyst オプティマイザーが実行できるリレーショナル クエリ プランを構築するための API であるため、APIと
DataFrame
は根本的に異なります。RDD
API は、クエリ プランの構築に精通している開発者にとって自然なものです。
SQL スタイルの例:
df.filter("age > 21");
制限 : コードはデータ属性を名前で参照しているため、コンパイラがエラーをキャッチすることはできません。属性名が正しくない場合、クエリ プランが作成される実行時にのみエラーが検出されます。
APIのもう 1 つの欠点は、DataFrame
非常にスカラ中心であり、Java をサポートしているものの、サポートが限られていることです。
たとえばDataFrame
、既存RDD
の Java オブジェクトから を作成する場合、Spark の Catalyst オプティマイザーはスキーマを推測できず、DataFrame 内のすべてのオブジェクトがscala.Product
インターフェイスを実装していると想定します。Scalacase class
はこのインターフェースを実装しているため、すぐに使用できます。
Dataset
API
Dataset
Spark 1.6 で API プレビューとしてリリースされた API は、両方の長所を提供することを目的としています。使い慣れたオブジェクト指向プログラミング スタイルとRDD
API のコンパイル時の型安全性に加えて、Catalyst クエリ オプティマイザーのパフォーマンス上の利点も備えています。データセットは、API と同じ効率的なオフヒープ ストレージ メカニズムも使用しDataFrame
ます。データのシリアル化に関しては、 API には、 JVM 表現 (オブジェクト) と Spark の内部バイナリ形式の間で変換するエンコーダ
Dataset
の概念があり ます。Spark には組み込みのエンコーダーがあり、バイトコードを生成してオフヒープ データとやり取りし、オブジェクト全体を逆シリアル化することなく個々の属性へのオンデマンド アクセスを提供するという点で非常に高度です。Spark は、カスタム エンコーダーを実装するための API をまだ提供していませんが、将来のリリースで計画されています。さらに、
Dataset
API は Java と Scala の両方で同様に機能するように設計されています。Java オブジェクトを操作する場合、それらが完全に Bean に準拠していることが重要です。
Dataset
API SQL スタイルの例:
dataset.filter(_.age < 21);
カタリストレベルの流れ。.(Spark Summit からの DataFrame と Dataset のプレゼンテーションのわかりやすい説明)
さらに読む... databricks記事 - 3 つの Apache Spark API の物語: RDD と DataFrames および Datasets
Apache Spark は 3 種類の API を提供します
これは、RDD、データフレーム、およびデータセットの API の比較です。
Spark が提供する主な抽象化は、回復力のある分散データセット (RDD) です。RDD は、並列で操作できるクラスターのノード間で分割された要素のコレクションです。
分散コレクション:
RDD は MapReduce 操作を使用します。これは、クラスター上で並列分散アルゴリズムを使用して大規模なデータセットを処理および生成するために広く採用されています。これにより、ユーザーは一連の高レベルの演算子を使用して、作業の分散やフォールト トレランスについて心配することなく、並列計算を作成できます。
不変:分割されたレコードのコレクションで構成される RDD。パーティションは、RDD における並列処理の基本単位であり、各パーティションはデータの 1 つの論理分割であり、不変であり、既存のパーティションでいくつかの変換を介して作成されます。不変性は、計算の一貫性を実現するのに役立ちます。
フォールト トレラント: RDD の一部のパーティションが失われた場合、複数のノード間でデータのレプリケーションを行うのではなく、同じ計算を行うために、そのパーティションの変換を系譜で再生できます。データ管理と複製に多くの労力を費やし、より高速な計算を実現します。
遅延評価: Spark のすべての変換は、結果をすぐに計算しないという点で遅延です。代わりに、一部の基本データセットに適用された変換を覚えているだけです。変換は、アクションがドライバー プログラムに結果を返す必要がある場合にのみ計算されます。
機能変換: RDD は、既存のデータセットから新しいデータセットを作成する変換と、データセットで計算を実行した後にドライバー プログラムに値を返すアクションの 2 種類の操作をサポートします。
データ処理フォーマット:
構造化データと非構造化データを簡単かつ効率的に処理できます。
サポートされるプログラミング言語:
RDD API は、Java、Scala、Python、および R で使用できます。
組み込みの最適化エンジンなし: 構造化データを扱う場合、RDD は、Catalyst オプティマイザーや Tungsten 実行エンジンなどの Spark の高度なオプティマイザーを利用できません。開発者は、その属性に基づいて各 RDD を最適化する必要があります。
構造化データの処理: Dataframe やデータセットとは異なり、RDD は取り込まれたデータのスキーマを推測せず、ユーザーがそれを指定する必要があります。
Spark は、Spark 1.3 リリースでデータフレームを導入しました。Dataframe は、RDD が抱えていた主要な課題を克服します。
DataFrame は、名前付きの列に編成されたデータの分散コレクションです。これは、リレーショナル データベースまたは R/Python データフレームのテーブルと概念的に同等です。Dataframe とともに、Spark は、高度なプログラミング機能を活用して拡張可能なクエリ オプティマイザーを構築する Catalyst オプティマイザーも導入しました。
行オブジェクトの分散コレクション: DataFrame は、名前付き列に編成されたデータの分散コレクションです。概念的にはリレーショナル データベースのテーブルと同等ですが、内部ではより高度な最適化が行われています。
データ処理: 構造化および非構造化データ形式 (Avro、CSV、エラスティック サーチ、Cassandra) およびストレージ システム (HDFS、HIVE テーブル、MySQL など) の処理。これらのさまざまなデータソースすべてから読み書きできます。
Catalystオプティマイザーを使用した最適化: SQL クエリと DataFrame API の両方を強化します。データフレームは、4 つのフェーズで触媒ツリー変換フレームワークを使用します。
1.Analyzing a logical plan to resolve references
2.Logical plan optimization
3.Physical planning
4.Code generation to compile parts of the query to Java bytecode.
Hive の互換性: Spark SQL を使用すると、既存の Hive ウェアハウスで変更されていない Hive クエリを実行できます。Hive フロントエンドと MetaStore を再利用し、既存の Hive データ、クエリ、および UDF との完全な互換性を提供します。
Tungsten: Tungsten は、メモリーを明示的に管理し、式評価用のバイトコードを動的に生成する物理実行バックエンドを提供します。
サポートされるプログラミング言語:
Dataframe API は、Java、Scala、Python、および R で使用できます。
例:
case class Person(name : String , age : Int)
val dataframe = sqlContext.read.json("people.json")
dataframe.filter("salary > 10000").show
=> throws Exception : cannot resolve 'salary' given input age , name
これは、いくつかの変換および集約ステップを使用している場合に特に困難です。
例:
case class Person(name : String , age : Int)
val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20)))
val personDF = sqlContext.createDataframe(personRDD)
personDF.rdd // returns RDD[Row] , does not returns RDD[Person]
データセット API は、タイプ セーフなオブジェクト指向プログラミング インターフェイスを提供する DataFrames の拡張機能です。これは、リレーショナル スキーマにマップされるオブジェクトの厳密に型指定された不変のコレクションです。
Dataset の中核にある API は、エンコーダーと呼ばれる新しい概念であり、JVM オブジェクトと表形式の間の変換を担当します。表形式の表現は、Spark の内部 Tungsten バイナリ形式を使用して保存されるため、シリアル化されたデータの操作とメモリ使用率の向上が可能になります。Spark 1.6 には、プリミティブ型 (String、Integer、Long など)、Scala ケース クラス、および Java Beans を含む、さまざまな型のエンコーダーの自動生成がサポートされています。
RDD と Dataframe の両方の長所を提供: RDD (関数型プログラミング、タイプ セーフ)、DataFrame (リレーショナル モデル、クエリの最適化、Tungsten 実行、並べ替えとシャッフル)
エンコーダー: エンコーダーを使用すると、任意の JVM オブジェクトをデータセットに簡単に変換できるため、ユーザーはデータフレームとは異なり、構造化データと非構造化データの両方を操作できます。
サポートされているプログラミング言語: Datasets API は現在、Scala と Java でのみ使用できます。Python と R は現在、バージョン 1.6 ではサポートされていません。Python のサポートは、バージョン 2.0 で予定されています。
型の安全性: データセット API は、データフレームでは利用できなかったコンパイル時の安全性を提供します。以下の例では、Dataset がコンパイル ラムダ関数を使用してドメイン オブジェクトを操作する方法を確認できます。
例:
case class Person(name : String , age : Int)
val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20)))
val personDF = sqlContext.createDataframe(personRDD)
val ds:Dataset[Person] = personDF.as[Person]
ds.filter(p => p.age > 25)
ds.filter(p => p.salary > 25)
// error : value salary is not a member of person
ds.rdd // returns RDD[Person]
例:
ds.select(col("name").as[String], $"age".as[Int]).collect()
Python と R のサポートなし: リリース 1.6 の時点で、データセットは Scala と Java のみをサポートします。Python のサポートは、Spark 2.0 で導入されます。
Datasets API は、既存の RDD および Dataframe API より優れた型安全性と関数型プログラミングを備えたいくつかの利点をもたらします。
A Dataframe is an RDD of Row objects, each representing a record. A Dataframe also knows the schema (i.e., data fields) of its rows. While Dataframes look like regular RDDs, internally they store data in a more efficient manner, taking advantage of their schema. In addition, they provide new operations not available on RDDs, such as the ability to run SQL queries. Dataframes can be created from external data sources, from the results of queries, or from regular RDDs.
Reference: Zaharia M., et al. Learning Spark (O'Reilly, 2015)