問題タブ [apache-spark-dataset]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
1 に答える
678 参照

scala - データセット flatmap groupBy 欠落したパラメーター タイプ

数値のリストを含む列を持つデータセットがあります。

これらすべてのリストに含まれるすべての数字の出現回数を数えたいと思います。そこで、flatMap を実行して、すべての整数のセットを取得します。グループ化したいので、各番号を1回だけにしてから、出現回数を追加します(2列目など)。これまでの私のコード:

しかし、「i」にはパラメーターの型がないと常に言われます。私はそれが Int であることを伝える必要があると思いますが、どうすればそれを行うことができますか? それとも、まったく違うものを見逃していますか?

0 投票する
0 に答える
3461 参照

scala - Spark 2.0 の DataFrame で flatMap を実行しようとすると、Dataset に格納されている型のエンコーダが見つかりません

次のコンパイル時エラーが発生し続けます。

Spark v1.6 から v2.0.2 にアップグレードしたところ、使用している多数のコードDataFrameがこのエラーについて不平を言っています。文句を言っているコードは次のようになります。

以前の SO の投稿は、

DataFrameただし、 which is equal to を使用しているため、ケースクラスはありませんDataSet[Row]。また、このメッセージを取り除くための助けを借りずに、次のように2つの暗黙的なインポートをインライン化しました。

DataSetEncoderのドキュメントを見たことに注意してください。ドキュメントには、次のようなことが書かれています。

ただし、私のメソッドにはアクセスできませんSparkSession。また、その行を試してみるとimport spark.implicits._、IntelliJ はそれを見つけることさえできません。私の DataFrame が DataSet[Row] であると言うとき、私は本当にそれを意味します。

この質問は重複の可能性があるとマークされていますが、明確にしてください。

  • ケース クラスまたはビジネス オブジェクトが関連付けられていません。
  • 私は .flatMap を使用していますが、他の質問は .map を使用しています
  • 暗黙のインポートは役に立たないようです
  • RowEncoder を渡すとコンパイル時エラーが発生するdata.flatMap(row => { ... }, RowEncoder(data.schema))(引数が多すぎる)

私は他の投稿を読んでいますが、追加させてください。この新しい Spark 2.0 Datasets/DataFrame API がどのように機能するのかわかりません。Spark シェルでは、以下のコードが機能します。このようにスパークシェルを開始することに注意してください$SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.10:1.5.0

ただし、これをテスト ユニットの一部として実行すると、同じ「エンコーダーが見つかりません」というエラーが発生します。これがシェルでは機能するのに、テスト ユニットでは機能しないのはなぜですか?

シェルで、入力し:imports:implicitsscala ファイル/ソースに配置しましたが、どちらも役に立ちません。

0 投票する
2 に答える
1925 参照

apache-spark-dataset - Null ポインター例外 - Apache Spark データセットの左外部結合

spark データセット (spark 2.0.1) を学習しようとしています。左の外部結合の下では、Null ポインター例外が作成されています。

16/12/14 16:48:26 エラー Executor: ステージ 2.0 のタスク 0.0 で例外 (TID 12) java.lang.NullPointerException

これは、左外部結合を実行しているときに、record._2.depname に null 値が与えられているためです。

これをどのように処理しますか?ありがとう

0 投票する
2 に答える
16227 参照

scala - Spark Dataframes- Reducing By Key

Let's say I have a data structure like this where ts is some timestamp

Given a large number of these records I want to end up with the record with the highest timestamp for each id. Using the RDD api I think the following code gets the job done:

Likewise this is my attempt with datasets:

I've being trying to work out how to achieve something similar with dataframes but to no avail- I realise I can do the grouping with:

But that gives me a RelationGroupedDataSet and it's not clear to me what aggregation function I need to write to achieve what I want- all example aggregations I've seen appear to focus on returning just a single column being aggregated rather than the whole row.

Is it possible to achieve this using dataframes?

0 投票する
1 に答える
3037 参照

apache-spark - Spark DataFrame に対してクエリ/カウントを並列化/分散するにはどうすればよいですか?

DataFrame一連のフィルター クエリを適用する必要があります。たとえば、次のようにロードDataFrameします。

次に、次のような「任意の」フィルターがたくさんあります。

  • C0='true' および C1='false'
  • C0='false' および C3='true'
  • 等々...

私は通常、これらのフィルターを util メソッドを使用して動的に取得します。

これらのフィルタを に適用しDataFrameてカウントを取得するだけです。例えば。

フィルターをマッピングするとき、これは並列/分散操作ではないことに気付きました。フィルターを RDD/DataFrame に貼り付けると、ネストされたデータ フレーム操作を実行することになるため、このアプローチも機能しません (SO で読んだように、Spark では許可されていません)。次のようなものは、NullPointerException (NPE) を返します。

DataFrameSpark でカウント フィルターを並列化/分散する方法はありますか? ところで、私は Spark v2.0.2 を使用しています。

0 投票する
2 に答える
15719 参照

apache-spark - Spark DataSet フィルターのパフォーマンス

型指定されたデータセットをフィルタリングするさまざまな方法を試してきました。パフォーマンスがまったく異なる可能性があることがわかります。

このデータ セットは、33 列と 4226047 行の 1.6 GB 行のデータに基づいて作成されました。csv データを読み込んで DataSet を作成し、case クラスにマッピングします。

UnitId = 'B02' のフィルターは、47980 行を返す必要があります。以下のように 3 つの方法をテストしました。

2)一時テーブルとSQLクエリを使用します(〜オプション1と同じ)

3) 強い型付けされたクラス フィールドを使用する (14,987ms、つまり 30 倍遅い)

Python API で再度テストしたところ、同じデータ セットに対して、タイミングは 17,046 ミリ秒で、scala API オプション 3 のパフォーマンスに匹敵します。

3) と python API が最初の 2 つのオプションとは異なる方法で実行される方法について、誰かが光を当てることができますか?

0 投票する
2 に答える
849 参照

scala - データフレームのnull値をuuidに埋める方法は?

1 つの列 (すべてが null ではない) に null 値を持つデータフレームがあり、null 値を uuid で埋める必要があります。方法はありますか?

私はこの方法を試しましたが、「field2」の各行には同じuuidがあります。

作り方は?1,000,000 行を超える場合

0 投票する
1 に答える
638 参照

apache-spark - RDD に対する DataSet のパフォーマンス上の利点

Spark の DataSetに関するいくつかの優れた記事 ( thisthis、およびthis ) を読んだ後、RDD に対する次の DataSet のパフォーマンス上の利点について説明します。

  1. 論理的および物理的な計画の最適化。
  2. 厳密な類型化;
  3. ベクトル化された操作。
  4. 低レベルのメモリ管理。

質問:

  1. Spark の RDD は、物理的な計画も構築し、同じ段階で複数の変換を結合/最適化できます。では、RDD に対する DataSet の利点は何でしょうか?
  2. 最初のリンクから、の例を見ることができますRDD[Person]DataSet には高度な型付けがありますか?
  3. 「ベクトル化された操作」とはどういう意味ですか?
  4. 私が理解しているように、DataSet の低メモリ管理 = 高度なシリアル化。これは、シリアル化可能なオブジェクトのオフヒープ ストレージを意味し、逆シリアル化せずにオブジェクトの 1 つのフィールドのみを読み取ることができます。しかし、持続戦略がある場合はどうでしょうか? IN_MEMORY_ONLYDataSet はどのような場合でもすべてをシリアル化しますか? RDD よりもパフォーマンス上の利点はありますか?
0 投票する
1 に答える
3308 参照

scala - Spark 2.0 暗黙的エンコーダー、型が Option[Seq[String]] (scala) の場合に欠落している列を処理する

Option[Seq[String]] 型のいくつかの列がデータ ソースにない場合、データのエンコードに問題があります。理想的には、欠落している列データを で埋めたいと思いますNone

シナリオ:

column1を含むがcolumn2を含まないいくつかの寄木細工のファイルを読み込んでいます。

これらの寄木細工のファイルからのデータを にロードし、DatasetとしてキャストしMyTypeます。

org.apache.spark.sql.AnalysisException:column2指定された入力列 ' ' を解決できません: [column1];

column2 データを としてデータセットを作成する方法はありますNoneか?