問題タブ [spark-csv]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - Spark: 以降の DataFrame での spark-csv パーティショニングと並列処理
spark-csvでソース データをインポートするときに、Spark で後続のより適切に分割された DataFrames の使用を強制する方法を知りたいです。
概要:
spark-csv
インポート時に明示的なパーティション分割をサポートしていないようですsc.textFile()
。- 「無料」で推論されたスキーマを提供しますが、クラスターで8つのエグゼキューターを使用している場合、デフォルトでは、通常は2つのパーティションのみでDataFrameが返されます。
- より多くのパーティションを持つ後続の DataFrame が
cache()
(ソース ファイルのインポート直後に) 経由でキャッシュされ、さらなる処理に使用されているにもかかわらず、Spark ジョブ履歴は依然としてタスクの分散に信じられないほどの偏りを示しています。私が期待するより均一な分布ではなく、タスク。
データを投稿できませんが、コードは単純な結合であり、 を介していくつかの列を追加し、.withColumn()
次に を介して非常に基本的な線形回帰を行いspark.mlib
ます。
以下は、Executor ごとのタスクを示す Spark History UI の比較画像です (最後の行はドライバーです)。
注: DataFrame を呼び出すかどうかrepartition()
に関係なく、同じ偏ったタスク分布が得られます。spark-csv
基本的にこれらの初期 DataFrame を忘れて、より適切に分割された DataFrame から開始するように Spark を「強制」するにはどうすればよいですか? または、spark-csv に何らかの方法で DataFrame を別の方法で分割するように強制するには (フォーク/ソースの変更なし)?
を使用してこの問題を解決できますsc.textFile(file, minPartitions)
が、適切に型指定されたスキーマが提供するなどの理由で、それに頼る必要がないことを願っていspark-csv
ます。
sql - str の形式が dd/mm/yyyy の場合、列の型を str から date に変換する方法は?
大きなcsvファイルからインポートしたSQLに大きなテーブルがあります。
列に dd/mm/yyyy 形式の日付情報が含まれている場合、その列は str として認識されます。
TO_DATE関数にはyyyy-mm-dd形式が必要なため、試しselect TO_DATE('12/31/2015') as date
ましたが機能しません。
'12/31/2015' 文字列を SQL 内で '2015-12-31' 形式に再配置して、列の型を日付に変換するにはどうすればよいですか?
update
sql のキーワードがサポートされていないように見えるデータのサイズが非常に大きいため、sparkSQL (databricks 環境) でこれを行っています。
apache-spark - spark-csv を使用して、文字列として表された CSV を Apache Spark に読み込むことはできますか
spark-csv ( https://github.com/databricks/spark-csv )を使用して csv ファイルを spark に読み込む方法は知っていますが、既に csv ファイルを文字列として表しており、この文字列を直接変換したいと考えています。データフレーム。これは可能ですか?
scala - EMR にカスタム Spark ブランチをインストールするには?
AWS EMR に、デフォルトの Spark ではなく、git リポジトリからの特定の Spark ブランチを持たせたいと考えています。
これを行う理由は、csv の null 値を修正したブランチを使用したいからです。
scala - bzip2 csv データでデータフレーム ユニオンを実行すると、範囲外のインデックス エラーが発生する
問題はかなり奇妙です。圧縮されていないファイルを使用する場合、問題はありません。しかし、圧縮された bz2 ファイルを使用すると、インデックス範囲外エラーが発生します。
私が読んだことから、行末文字を検出せず、全体を巨大な行として読み取るのは明らかにspark-csvパーサーです。圧縮されていないcsvでは機能するが、.csv.bz2ファイルでは機能しないという事実は、私にとってかなり奇妙です。
また、私が言ったように、データフレーム ユニオンを実行するときにのみ発生します。スパークコンテキストでrddユニオンを実行しようとしましたが、同じエラーです。