1

csv 行のセットを特定の列でグループ化し、各グループで何らかの処理を行う必要があります。

    JavaRDD<String> lines = sc.textFile
                        ("somefile.csv");
                JavaPairRDD<String, String> pairRDD = lines.mapToPair(new SomeParser());
                List<String> keys = pairRDD.keys().distinct().collect();
                for (String key : keys)
                {
                List<String> rows = pairRDD.lookup(key);

            noOfVisits = rows.size();
            country = COMMA.split(rows.get(0))[6];
            accessDuration = getAccessDuration(rows,timeFormat);
            Map<String,Integer> counts = getCounts(rows);
            whitepapers = counts.get("whitepapers");
            tutorials = counts.get("tutorials");
            workshops = counts.get("workshops");
            casestudies = counts.get("casestudies");
            productPages = counts.get("productpages");        
            }

    private static long dateParser(String dateString) throws ParseException {
        SimpleDateFormat format = new SimpleDateFormat("MMM dd yyyy HH:mma");
        Date date = format.parse(dateString);
        return date.getTime();
    }
dateParser is called for each row. Then min and max for the group is calculated to get the access duration. Others are string matches.

pairRDD.lookup は非常に遅いです.spark でこれを行うためのより良い方法はありますか?

4

1 に答える 1

3

その列をキーとして単純に使用して、groupByKey. これらの行の操作については言及されていません。それらの行を何らかの方法で結合する関数であれば、 を使用することもできますreduceByKey

何かのようなもの:

import org.apache.spark.SparkContext._  // implicit pair functions
val pairs = lines.map(parser _)
val grouped = pairs.groupByKey
// here grouped is of the form: (key, Iterator[String])

*編集* プロセスを見た後、各行をそれが寄与するデータにマップし、aggregateByKeyそれらをすべて合計に減らすために使用する方が効率的だと思います。 aggregateByKey2 つの関数とゼロを取ります。

def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
      combOp: (U, U) => U): RDD[(K, U)]

最初の関数はパーティション アグリゲーターであり、ローカル パーティションを効率的に実行し、パーティションごとにローカルに集計されたパーシャルを作成します。CombineOperation は、これらの部分的な集計を取得し、それらを組み合わせて最終結果を取得します。

このようなもの:

val lines = sc.textFile("somefile.csv")
// parse returns a key and a decomposed Record of values tracked:(key, Record("country", timestamp,"whitepaper",...)) 

val records = lines.map(parse(_))

val totals = records.aggregateByKey((0,Set[String].empty,Long.MaxValue, Long.MinValue, Map[String,Int].empty),
(record, (count, countrySet, minTime, maxTime, counterMap )) => (count+1,countrySet + record.country, math.min(minTime,record.timestamp), math.max(maxTime, record.timestamp), ...)
(cumm1, cumm2) => ???  // add each field of the cummulator
) 

これは、Spark でキーベースの集計を行う最も効率的な方法です。

于 2014-10-28T15:30:40.820 に答える