4

Spark & Scala を始めたばかりです

複数のファイルを含むディレクトリがあり、それらを使用して正常にロードしました

sc.wholeTextFiles(directory)

今はもう一つ上のレベルに行きたいと思っています。実際には、ファイルを含むサブディレクトリを含むディレクトリがあります。私の目標は、ファイルの名前と内容を表すを取得しRDD[(String,String)]て、先に進むことです。RDD

私は次のことを試しました:

val listOfFolders = getListOfSubDirectories(rootFolder)
val input = listOfFolders.map(directory => sc.wholeTextFiles(directory))

しかし、私はこれをに変換するにはどうSeq[RDD[(String,String)]] すればよいですか?SeqRDD[(String,String)]

それとも、私は正しいことをしていないので、別のアプローチを試す必要がありますか?

編集:コードを追加

// HADOOP VERSION
val rootFolderHDFS = "hdfs://****/"
val hdfsURI = "hdfs://****/**/"

// returns a list of folders (currently about 800)
val listOfFoldersHDFS = ListDirectoryContents.list(hdfsURI,rootFolderHDFS)
val inputHDFS = listOfFoldersHDFS.map(directory => sc.wholeTextFiles(directory))
// RDD[(String,String)]
//    val inputHDFS2 = inputHDFS.reduceRight((rdd1,rdd2) => rdd2 ++ rdd1)
val init = sc.parallelize(Array[(String, String)]())
val inputHDFS2 = inputHDFS.foldRight(init)((rdd1,rdd2) => rdd2 ++ rdd1)

// returns org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.StackOverflowError
println(inputHDFS2.count)
4

3 に答える 3

4

このように減らすことができますSeq( と を連結RDDします++):

val reduced: RDD[(String, String)] = input.reduce((left, right) => left ++ right)

reduceを適用できる理由をさらに詳しく説明します。

  • ++連想的です-rdda ++(rddb ++ rddc)または(rdda ++ rddb)++ rddcは関係ありません
  • が空でないことを前提としていSeqます (それ以外の場合は、初期アキュムレータとしてfold空が必要になります)。RDD[(String, String)]

の正確なタイプによってはSeq、スタックオーバーフローが発生する可能性があるため、注意してより大きなコレクションでテストしてください。ただし、標準ライブラリの場合は安全だと思います。

于 2014-12-31T15:26:27.573 に答える
3

unionスパークコンテキストによって提供されたものを使用する必要があります

val rdds: Seq[RDD[Int]] = (1 to 100).map(i => sc.parallelize(Seq(i)))
val rdd_union: RDD[Int] = sc.union(rdds) 
于 2016-05-05T11:11:03.193 に答える
2

各ディレクトリを個別のRDDにロードする代わりに、パスワイルドカードを使用してすべてのディレクトリを単一のRDDにロードできますか?

次のディレクトリ ツリーを考えると...

$ tree test/spark/so
test/spark/so
├── a
│   ├── text1.txt
│   └── text2.txt
└── b
    ├── text1.txt
    └── text2.txt

ディレクトリのワイルドカードを使用して RDD を作成します。

scala> val rdd =  sc.wholeTextFiles("test/spark/so/*/*")
rdd: org.apache.spark.rdd.RDD[(String, String)] = test/spark/so/*/ WholeTextFileRDD[16] at wholeTextFiles at <console>:37

ご想像のとおり、カウントは 4 です。

scala> rdd.count
res9: Long = 4

scala> rdd.collect
res10: Array[(String, String)] =
Array((test/spark/so/a/text1.txt,a1
a2
a3), (test/spark/so/a/text2.txt,a3
a4
a5), (test/spark/so/b/text1.txt,b1
b2
b3), (test/spark/so/b/text2.txt,b3
b4
b5))
于 2014-12-31T16:09:56.357 に答える