2

私はRDD次の構造を持っています:

val rdd = RDD[ (category: String, product: String, score: Double) ]

私の目的はgroup、カテゴリに基づいてデータを取得することです。次に、カテゴリごとにsortの wrt スコアを取得しますTuple 2 (product, score)。今のところ私のコードは次のとおりです。

val result = rdd.groupByKey.mapValues(v => v.toList.sortBy(-_._2))

これは、私が持っているデータに対して非常に高価な操作であることがわかりました。別のアプローチを使用してパフォーマンスを改善しようとしています。

4

2 に答える 2

3

データセットを知らずに答えるのは難しいですが、ドキュメントにはパフォーマンスに関するいくつかの手がかりがありますgroupByKey:

注: この操作は非常にコストがかかる場合があります。各キーに対して集計 (合計や平均など) を実行するためにグループ化する場合は、PairRDDFunctions.aggregateByKey または PairRDDFunctions.reduceByKey を使用すると、パフォーマンスが大幅に向上します。

したがって、ソートされたリストで何をするつもりかによって異なります。すべてのリスト全体が必要な場合は、改善が難しい場合がありますgroupByKey。何らかの集計を実行している場合は、上記の代替操作 ( aggregateByKeyreduceByKey) の方が適している場合があります。

リストのサイズによっては、並べ替えの前に別のコレクション (変更可能な配列など) を使用する方が効率的な場合があります。

編集: カテゴリの数が比較的少ない場合は、元の RDD を繰り返しフィルター処理し、フィルター処理された各 RDD を並べ替えることができます。全体的には同様の量の作業が行われますが、特定の瞬間に使用するメモリが少なくなる場合があります。

編集 2 : メモリ不足が問題になる場合は、カテゴリと製品を文字列ではなく整数 ID として表し、後で名前を検索するだけでよい場合があります。このようにして、メインの RDD をはるかに小さくすることができます。

于 2016-04-29T15:39:29.790 に答える
0

あなたの RDD はカテゴリごとに公平に分散されていますか? スキュー要因によっては問題が発生する可能性があります。キー値が多すぎない場合は、次のようにしてみてください。

val rdd: RDD[(String, String, Double)] = sc.parallelize(Seq(("someCategory","a",1.0),("someCategory","b",3.0),("someCategory2","c",4.0)))

rdd.keyBy(_._1).countByKey().foreach(println)
于 2016-04-29T15:45:37.650 に答える