spark-csvでソース データをインポートするときに、Spark で後続のより適切に分割された DataFrames の使用を強制する方法を知りたいです。
概要:
spark-csv
インポート時に明示的なパーティション分割をサポートしていないようですsc.textFile()
。- 「無料」で推論されたスキーマを提供しますが、クラスターで8つのエグゼキューターを使用している場合、デフォルトでは、通常は2つのパーティションのみでDataFrameが返されます。
- より多くのパーティションを持つ後続の DataFrame が
cache()
(ソース ファイルのインポート直後に) 経由でキャッシュされ、さらなる処理に使用されているにもかかわらず、Spark ジョブ履歴は依然としてタスクの分散に信じられないほどの偏りを示しています。私が期待するより均一な分布ではなく、タスク。
データを投稿できませんが、コードは単純な結合であり、 を介していくつかの列を追加し、.withColumn()
次に を介して非常に基本的な線形回帰を行いspark.mlib
ます。
以下は、Executor ごとのタスクを示す Spark History UI の比較画像です (最後の行はドライバーです)。
注: DataFrame を呼び出すかどうかrepartition()
に関係なく、同じ偏ったタスク分布が得られます。spark-csv
基本的にこれらの初期 DataFrame を忘れて、より適切に分割された DataFrame から開始するように Spark を「強制」するにはどうすればよいですか? または、spark-csv に何らかの方法で DataFrame を別の方法で分割するように強制するには (フォーク/ソースの変更なし)?
を使用してこの問題を解決できますsc.textFile(file, minPartitions)
が、適切に型指定されたスキーマが提供するなどの理由で、それに頼る必要がないことを願っていspark-csv
ます。