6

私は Spark で遊んでいましたが、なんとかデータをクランチすることができました。私のデータは、50 列と約 2000 万行からなるフラット区切りのテキスト ファイルで構成されています。各列を処理するスカラ スクリプトがあります。

並列処理に関しては、RDD 操作が複数のノードで実行されることを知っています。したがって、列を処理するたびに、それらは並行して処理されますが、列自体は順次処理されます。

簡単な例: 私のデータが 5 列のテキスト区切りファイルで、各列にテキストが含まれていて、各列の単語数をカウントしたい場合。私はするだろう:

for(i <- 0 until 4){
   data.map(_.split("\t",-1)(i)).map((_,1)).reduce(_+_)
}

各列の操作は並行して実行されますが、列自体は順次処理されます (悪い言い回しです。申し訳ありません!)。つまり、列 1 が完了した後に列 2 が処理されます。列 3 は、列 1 と 2 が完了した後に処理されます。

私の質問は、一度に複数の列を処理する方法はありますか? チュートリアルなどの方法をご存知でしたら、私と共有していただけませんか?

ありがとうございました!!

4

2 に答える 2

3

入力が seq であるとします。以下は、列を同時に処理するために行うことができます。基本的な考え方は、シーケンス (列、入力) をキーとして使用することです。

scala> val rdd = sc.parallelize((1 to 4).map(x=>Seq("x_0", "x_1", "x_2", "x_3")))
rdd: org.apache.spark.rdd.RDD[Seq[String]] = ParallelCollectionRDD[26] at parallelize at <console>:12

scala> val rdd1 = rdd.flatMap{x=>{(0 to x.size - 1).map(idx=>(idx, x(idx)))}}
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = FlatMappedRDD[27] at flatMap at <console>:14

scala> val rdd2 = rdd1.map(x=>(x, 1))
rdd2: org.apache.spark.rdd.RDD[((Int, String), Int)] = MappedRDD[28] at map at <console>:16

scala> val rdd3 = rdd2.reduceByKey(_+_)
rdd3: org.apache.spark.rdd.RDD[((Int, String), Int)] = ShuffledRDD[29] at reduceByKey at <console>:18

scala> rdd3.take(4)
res22: Array[((Int, String), Int)] = Array(((0,x_0),4), ((3,x_3),4), ((2,x_2),4), ((1,x_1),4))

出力例: ((0, x_0), 4) は最初の列を意味し、キーは x_0、値は 4 です。ここから開始して、さらに処理を進めることができます。

于 2014-10-11T05:41:28.003 に答える
1

scala parallize collection 機能を使用する次のコードを試すことができます。

(0 until 4).map(index => (index,data)).par.map(x => {
    x._2.map(_.split("\t",-1)(x._1)).map((_,1)).reduce(_+_)
}

データは参照であるため、データを複製してもそれほどコストはかかりません。また、rdd は読み取り専用であるため、並列処理が機能します。parメソッドは、並列収集機能を使用します。Spark Web UI で並列ジョブを確認できます。

于 2015-04-25T13:12:46.623 に答える