3

更新: この質問を保留してください。これは、Spark 1.5 自体の問題である可能性があることがわかりました。これは、Spark の公式バージョンを使用していないためです。この質問を更新し続けます。ありがとうございました!

最近、Spark-CSV を使用して Spark の DataFrame に CSV をインポートするときに奇妙なバグに気付きました。

ここに私のサンプルコードがあります:

  object sparktry
  {
    def main(args: Array[String])
    {
      AutoLogger.setLevel("INFO")

      val sc = SingletonSparkContext.getInstance()
      val sql_context = SingletonSQLContext.getInstance(sc)

      val options = new collection.mutable.HashMap[String, String]()
      options += "header" -> "true"
      options += "charset" -> "UTF-8"

      val customSchema = StructType(Array(
        StructField("Year", StringType),
        StructField("Brand", StringType),
        StructField("Category", StringType),
        StructField("Model", StringType),
        StructField("Sales", DoubleType)))

      val dataFrame = sql_context.read.format("com.databricks.spark.csv")
      .options(options)
      .schema(customSchema)
      .load("hdfs://myHDFSserver:9000/BigData/CarSales.csv")

      dataFrame.head(10).foreach(x => AutoLogger.info(x.toString))
    }
  }

CarSales は非常に小さな csv です。spark.masterではない場合、16GB 以上localに設定spark.executor.memoryすると DataFrame が破損することに気付きました。このプログラムの出力は次のようになります: (ログからテキストをコピーしました。この場合spark.executor.memoryは 32GB に設定されています)

16/03/07 12:39:50.190 INFO DAGScheduler: Job 1 finished: head at sparktry.scala:35, took 8.009183 s
16/03/07 12:39:50.225 INFO AutoLogger$: [       ,  ,      ,ries       ,142490.0]
16/03/07 12:39:50.225 INFO AutoLogger$: [       ,  ,      ,ries       ,112464.0]
16/03/07 12:39:50.226 INFO AutoLogger$: [       ,  ,      ,ries       ,90960.0]
16/03/07 12:39:50.226 INFO AutoLogger$: [       ,  ,      ,ries       ,100910.0]
16/03/07 12:39:50.226 INFO AutoLogger$: [       ,  ,      ,ries       ,94371.0]
16/03/07 12:39:50.226 INFO AutoLogger$: [       ,  ,      ,ries       ,54142.0]
16/03/07 12:39:50.226 INFO AutoLogger$: [       ,  ,       ,ries       ,14773.0]
16/03/07 12:39:50.226 INFO AutoLogger$: [       ,  ,       ,ries       ,12276.0]
16/03/07 12:39:50.227 INFO AutoLogger$: [       ,  ,       ,ries       ,9254.0]
16/03/07 12:39:50.227 INFO AutoLogger$: [       ,  ,       ,ries       ,12253.0]

ファイルの最初の 10 行は次のとおりです。

1/1/2007,BMW,Compact,BMW 3-Series,142490.00
1/1/2008,BMW,Compact,BMW 3-Series,112464.00
1/1/2009,BMW,Compact,BMW 3-Series,90960.00
1/1/2010,BMW,Compact,BMW 3-Series,100910.00
1/1/2011,BMW,Compact,BMW 3-Series,94371.00
1/1/2007,BMW,Compact,BMW 5-Series,54142.00
1/1/2007,BMW,Fullsize,BMW 7-Series,14773.00
1/1/2008,BMW,Fullsize,BMW 7-Series,12276.00
1/1/2009,BMW,Fullsize,BMW 7-Series,9254.00
1/1/2010,BMW,Fullsize,BMW 7-Series,12253.00

spark.executor.memory私のマシンで 16GB に変更しただけでは最初の 10 行は正しいのですが、16GB を超える設定にすると破損することに気付きました。

さらに、256 GB のメモリを搭載したサーバーの 1 つで、これを 16 GB に設定すると、このバグも発生します。代わりに、48GB に設定すると正常に動作します。さらに、印刷しようとしましdataFrame.rddたが、RDD の内容が正しいことを示していますが、データフレーム自体はそうではありません。

誰でもこの問題について何か考えがありますか?

ありがとうございました!

4

2 に答える 2

0

私はあなたのコードを実行し、Spark のデフォルト設定で hdfs から csv データを取得できました。

以下の行のコードを更新しました:

val conf = new org.apache.spark.SparkConf().setMaster("local[2]").setAppName("HDFSReadDemo");
val sc = new org.apache.spark.SparkContext(conf); 
val sql_context = new org.apache.spark.sql.SQLContext(sc) 

そして、ロガーの代わりに println() 。

dataFrame.head(10).foreach(x => println(x))

したがって、Spark メモリ構成 (つまり、spark.executor.memory) に問題はありません。

于 2016-03-07T15:11:29.580 に答える