0

私はsparkを使用してkaggleでこの問題を解決しようとしています:

入力の階層は次のようになります。

drivers/{driver_id}/trip#.csv
e.g., drivers/1/1.csv
      drivers/1/2.csv
      drivers/2/1.csv

親ディレクトリ「drivers」を読み取り、各サブディレクトリに対して、キーを( sub_directory ,file_name)として、値をファイルのコンテンツとしてペアRDD を作成したいと考えています。

このリンク を確認して使用しようとしました

val text = sc.wholeTextFiles("drivers")
text.collect()

これはエラーで失敗しました:

java.lang.ArrayIndexOutOfBoundsException: 0
    at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat$OneFileInfo.<init>(CombineFileInputFormat.java:591)
    at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getMoreSplits(CombineFileInputFormat.java:283)
    at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:243)
    at org.apache.spark.rdd.WholeTextFileRDD.getPartitions(NewHadoopRDD.scala:267)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1779)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:885)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:884)

しかし、以下のコードを実行すると動作します。

val text =  sc.wholeTextFiles("drivers/1")
text.collect()

ここでは、ディレクトリドライバーを読み取ってファイルをループし、エントリごとにwholeTextFilesを呼び出す必要があるためです。

4

1 に答える 1

1

使用する代わりに

sc.textfile("path/*/**") or sc.wholeTextFiles("path/*")

このコードを使用できます。Spark はフォルダーとサブフォルダーのすべての可能な値を内部的に一覧表示するため、大規模なデータセットでは時間がかかる可能性があります。その代わりに、同じ目的でユニオンを使用できます。

場所を含むこの List オブジェクトを次のコードに渡します。注意: sc は SQLContext のオブジェクトです。

var df: DataFrame = null;
  for (file <- files) {
    val fileDf= sc.textFile(file)
    if (df!= null) {
      df= df.unionAll(fileDf)
    } else {
      df= fileDf
    }
  }

これで、最終的な統合RDD、つまりdfが得られました

于 2015-08-20T12:04:28.647 に答える