7

Java Spark アプリ (実際にはいくつかの計算に RDD を使用します) をDatasetsの代わりに使用するように調整したいと考えていますRDDs。私はデータセットを初めて使用し、対応するデータセット操作にどのトランザクションをマップするかわかりません。

現時点では、次のようにマッピングします。

JavaSparkContext.textFile(...)                       -> SQLContext.read().textFile(...)
JavaRDD.filter(Function)                             -> Dataset.filter(FilterFunction)
JavaRDD.map(Function)                                -> Dataset.map(MapFunction)
JavaRDD.mapToPair(PairFunction)                      -> Dataset.groupByKey(MapFunction) ???
JavaPairRDD.aggregateByKey(U, Function2, Function2)  -> KeyValueGroupedDataset.???

対応する質問は次のとおりです。

  • JavaRDD.mapToPairメソッドに等しいDataset.groupByKey
  • JavaPairRDDにマップしKeyValueGroupedDatasetますか?
  • メソッドと等しいメソッドはどれJavaPairRDD.aggregateByKeyですか?

ただし、次の RDD コードをデータセットに移植したいと考えています。

JavaRDD<Article> goodRdd = ...

JavaPairRDD<String, Article> ArticlePairRdd = goodRdd.mapToPair(new PairFunction<Article, String, Article>() {              // Build PairRDD<<Date|Store|Transaction><Article>>
    public Tuple2<String, Article> call(Article article) throws Exception {
        String key = article.getKeyDate() + "|" + article.getKeyStore() + "|" + article.getKeyTransaction() + "|" + article.getCounter();
        return new Tuple2<String, Article>(key, article);
    }
});

JavaPairRDD<String, String> transactionRdd = ArticlePairRdd.aggregateByKey("",                                              // Aggregate distributed data -> PairRDD<String, String>
    new Function2<String, Article, String>() {
        public String call(String oldString, Article newArticle) throws Exception {
            String articleString = newArticle.getOwg() + "_" + newArticle.getTextOwg();                                     // <<Date|Store|Transaction><owg_textOwg###owg_textOwg>>
            return oldString + "###" + articleString;
        }
    }, 
    new Function2<String, String, String>() {
        public String call(String a, String b) throws Exception {
            String c = a.concat(b);
            ...
            return c;
        }
    }
);

私のコードはまだこれに見えます:

Dataset<Article> goodDS = ...

KeyValueGroupedDataset<String, Article> ArticlePairDS = goodDS.groupByKey(new MapFunction<Article, String>() {
    public String call(Article article) throws Exception {
        String key = article.getKeyDate() + "|" + article.getKeyStore() + "|" + article.getKeyTransaction() + "|" + article.getCounter();
        return key;
    }
}, Encoders.STRING());

// here I need something similar to aggregateByKey! Not reduceByKey as I need to return another data type (String) than I have before (Article)
4

0 に答える 0