5

寄木細工のファイルといくつかの後処理関数をループするときに、メモリの問題が原因で Spark がクラッシュしないようにする方法を見つけようとしています。大量のテキストで申し訳ありませんが、これは特定のバグではありません (私は PySpark を使用しています)。これが適切なスタック オーバーフロー フォームに違反している場合は、お詫びします。

基本的な擬似コードは次のとおりです。

#fileNums are the file name partitions in the parquet file
#I read each one in as a separate file from its  "=" subdirectory
for counter in fileNums:
  sparkDataFrame = sqlContext.read.parquet(counter)
  summaryReportOne = sqlContext.sql.("SELECT.....")
  summaryReportOne.write.partition("id").parquet("/")
  summaryReportTwo = sqlContext.sql.("SELECT....")
  summaryReportTwo.write.partition("id").parquet("/")
  #several more queries, several involving joins, etc....

このコードは Spark SQL クエリを使用しているため、すべての SQL クエリ/関数を使用してラッパー関数を作成し、それを foreach (入力として sparkContext または sqlQuery を使用できない) に渡すことに失敗しました。ループ。

技術的には、これはパーティションを持つ 1 つの大きな寄木細工のファイルですが、一度にすべてを読み込んでクエリするには大きすぎます。各パーティションで関数を実行する必要があります。そのため、PySpark で通常の python ループを実行するだけで、各ループで 1 つの寄木細工のパーティション (サブディレクトリ) を処理し、関連する出力レポートを作成します。

寄木細工のファイル全体のサイズが原因で、すべてのコードを大きな mapPartition() にラップしてもうまくいくかどうかわかりませんか?

しかし、数回ループした後、メモリ エラー、具体的には Java ヒープ エラーが原因でスクリプトがクラッシュします。(ループがクラッシュするファイルについて特別なことは何もないことを確認しました。これは、2 番目または 3 番目のループで読み込まれた任意のファイルで発生します。)

Caused by: com.google.protobuf.ServiceException:     
java.lang.OutOfMemoryError: Java heap space
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:244)
at com.sun.proxy.$Proxy9.delete(Unknown Source)
at    org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.delete(ClientNamenodeProtocolTranslatorPB.java:526)
... 42 more
Caused by: java.lang.OutOfMemoryError: Java heap space

Spark がループで実行されることを意図していないことは理解していますが、これらの SQL クエリは、標準の Spark SQL パッケージ関数だけでは少し複雑すぎます。また、さまざまな集計統計について、ファイルごとに複数の概要レポートを書き出します。

基本的に各ループ インデックスの最後にメモリをクリアする方法はありますか? sqlContext.dropTempTable() を使用して登録済みの一時テーブルを削除し、sqlContext.clearCache() を使用してキャッシュをクリアしても効果はありませんでした。sparkContext を停止して各ループで再起動しようとすると、一部のプロセスがまだ「ラップ」されていないため、エラーも発生します (以前はコンテキストを「適切に」停止できたようですが、私は現在の PySpark ドキュメントではこれを見つけることができませんでした。)

また、データフレームを使い終わった後、ループ内のデータフレームで unpersist() を呼び出していないことにも注意してください。各ループ内のデータフレームを書き直すだけです(これは問題の一部である可能性があります)。

私はエンジニアリング チームと協力してメモリ設定を微調整していますが、このスクリプトの 1 つのループを完了するのに十分なメモリを既に割り当てていることがわかっています (1 つのループはエラーなしで実行されます)。

このユースケースでは、Spark よりも優れている可能性のあるツールを含め、あらゆる提案が役に立ちます。Spark バージョン 1.6.1 を使用しています。

4

2 に答える 2

1

更新:各ループでの処理が完了した後に SQL クエリから作成した各テーブルで unpersist() を呼び出すと、ループはメモリの問題なしに次の反復に正常に続行できます。上記のように、.clearCache() と一時テーブルの削除だけでは、うまくいきませんでした。テーブルはsparkSQLクエリからのものでしたが、RDDを返すため、これがうまくいったと思います。

これらの RDD に対して persist() を呼び出しませんでしたが、新しい SQL クエリをこれらの同じ変数名に割り当てることができるように、次のループが始まる前にそれらをクリアするように Spark に指示する必要がありました。

于 2016-05-20T18:49:06.797 に答える
1

可能であれば、新しくリリースされた Spark 2.0 にアップグレードしてみてください。

あなたと同じように、Javaヒープスペースで非常によく似た問題に遭遇していました。spark 1.6.2 で、データフレームを作成して最初に呼び出すプロセスを繰り返すだけで、4G のヒープ領域を超えることができました。

SparkSession を使用する spark 2.0 では、同じプログラムが 1.2 GB のヒープ スペースしか得られず、メモリ使用量は、私が実行していたそのプログラムに期待されるように非常に一貫していました。

于 2016-08-08T15:11:40.440 に答える