問題タブ [pyspark-sql]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
pyspark - Python Spark トランザクションをネストされたスキーマにグループ化
pyspark.sql.dataframe.DataFrame
" " に保存されているトランザクションを、トランザクションのソース (この場合は顧客 ID) を示すddf
列 " "でグループ化したいと考えています。key
グループ化は非常にコストのかかるプロセスであるため、ネストされたスキーマでグループをディスクに書き込みたいと考えています。
これにより、すべてのトランザクションをキーにすばやくロードし、グループ化を再実行することなく複雑なカスタム アグリゲーターを開発できます。
ネストされたスキーマを作成してディスクに書き込むにはどうすればよいですか?
group-by - ユーザー定義関数を使用した Pyspark データ フレーム集約
ユーザー定義関数で「groupby(key).agg(」を使用するにはどうすればよいですか?具体的には、キーごとにすべての一意の値のリストが必要です [カウントしない]。
apache-spark - csv ファイルを Spark データフレームにインポートする
pyspark を使用して csv ファイルをインポートしようとしています。これとこれを試しました。
最初の方法を使用して、csv ファイルを読み取ることができました。しかし、変数の数はかなり多いです。そのため、変数名を手動で言及するのは困難です。
2 番目の方法 (spark-csv) を使用すると、コマンド プロンプトを使用して csv ファイルを読み取ることができました。しかし、Jupyter ノートブックで同じ方法を使用しようとすると、エラーが発生します。
このオプションも疲れました。「conf」ファイルを修正しました。しかし、Windows環境で「PACKAGES」と「PYSPARK_SUBMIT_ARGS」を設定する方法がわかりません。
Sparkデータフレームでcsvファイルを読み取る方法を教えてください。
ありがとう!
apache-spark-sql - ParquetFileReader の並列処理を構成するには?
パーティション分割された spark sql データフレーム (約 300 パーティション) を hdfs に保存しようとしていますが、非常に遅いです:
Joined_table.write.mode('overwrite').partitionBy(target).save(path_out)
これは次のことが原因である可能性があると思います。
INFO ParquetFileReader: 並列処理を使用してアクションを開始しています: 5
この並列処理を構成する方法について何か考えはありますか? このリンクが役立つ場合があります: https://forums.databricks.com/questions/1097/stall-on-loading-many-parquet-files-on-s3.html ですが、Pyspark で newAPIHadoopFile を使用する方法がわかりません。
loops - スクリプトで Spark parquet ファイルを反復/ループすると、メモリ エラー/ビルドアップが発生します (Spark SQL クエリを使用)。
寄木細工のファイルといくつかの後処理関数をループするときに、メモリの問題が原因で Spark がクラッシュしないようにする方法を見つけようとしています。大量のテキストで申し訳ありませんが、これは特定のバグではありません (私は PySpark を使用しています)。これが適切なスタック オーバーフロー フォームに違反している場合は、お詫びします。
基本的な擬似コードは次のとおりです。
このコードは Spark SQL クエリを使用しているため、すべての SQL クエリ/関数を使用してラッパー関数を作成し、それを foreach (入力として sparkContext または sqlQuery を使用できない) に渡すことに失敗しました。ループ。
技術的には、これはパーティションを持つ 1 つの大きな寄木細工のファイルですが、一度にすべてを読み込んでクエリするには大きすぎます。各パーティションで関数を実行する必要があります。そのため、PySpark で通常の python ループを実行するだけで、各ループで 1 つの寄木細工のパーティション (サブディレクトリ) を処理し、関連する出力レポートを作成します。
寄木細工のファイル全体のサイズが原因で、すべてのコードを大きな mapPartition() にラップしてもうまくいくかどうかわかりませんか?
しかし、数回ループした後、メモリ エラー、具体的には Java ヒープ エラーが原因でスクリプトがクラッシュします。(ループがクラッシュするファイルについて特別なことは何もないことを確認しました。これは、2 番目または 3 番目のループで読み込まれた任意のファイルで発生します。)
Spark がループで実行されることを意図していないことは理解していますが、これらの SQL クエリは、標準の Spark SQL パッケージ関数だけでは少し複雑すぎます。また、さまざまな集計統計について、ファイルごとに複数の概要レポートを書き出します。
基本的に各ループ インデックスの最後にメモリをクリアする方法はありますか? sqlContext.dropTempTable() を使用して登録済みの一時テーブルを削除し、sqlContext.clearCache() を使用してキャッシュをクリアしても効果はありませんでした。sparkContext を停止して各ループで再起動しようとすると、一部のプロセスがまだ「ラップ」されていないため、エラーも発生します (以前はコンテキストを「適切に」停止できたようですが、私は現在の PySpark ドキュメントではこれを見つけることができませんでした。)
また、データフレームを使い終わった後、ループ内のデータフレームで unpersist() を呼び出していないことにも注意してください。各ループ内のデータフレームを書き直すだけです(これは問題の一部である可能性があります)。
私はエンジニアリング チームと協力してメモリ設定を微調整していますが、このスクリプトの 1 つのループを完了するのに十分なメモリを既に割り当てていることがわかっています (1 つのループはエラーなしで実行されます)。
このユースケースでは、Spark よりも優れている可能性のあるツールを含め、あらゆる提案が役に立ちます。Spark バージョン 1.6.1 を使用しています。