Spark & Scala を始めたばかりです
複数のファイルを含むディレクトリがあり、それらを使用して正常にロードしました
sc.wholeTextFiles(directory)
今はもう一つ上のレベルに行きたいと思っています。実際には、ファイルを含むサブディレクトリを含むディレクトリがあります。私の目標は、ファイルの名前と内容を表すを取得しRDD[(String,String)]
て、先に進むことです。RDD
私は次のことを試しました:
val listOfFolders = getListOfSubDirectories(rootFolder)
val input = listOfFolders.map(directory => sc.wholeTextFiles(directory))
しかし、私はこれをに変換するにはどうSeq[RDD[(String,String)]]
すればよいですか?Seq
RDD[(String,String)]
それとも、私は正しいことをしていないので、別のアプローチを試す必要がありますか?
編集:コードを追加
// HADOOP VERSION
val rootFolderHDFS = "hdfs://****/"
val hdfsURI = "hdfs://****/**/"
// returns a list of folders (currently about 800)
val listOfFoldersHDFS = ListDirectoryContents.list(hdfsURI,rootFolderHDFS)
val inputHDFS = listOfFoldersHDFS.map(directory => sc.wholeTextFiles(directory))
// RDD[(String,String)]
// val inputHDFS2 = inputHDFS.reduceRight((rdd1,rdd2) => rdd2 ++ rdd1)
val init = sc.parallelize(Array[(String, String)]())
val inputHDFS2 = inputHDFS.foldRight(init)((rdd1,rdd2) => rdd2 ++ rdd1)
// returns org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.StackOverflowError
println(inputHDFS2.count)