3

以下のコードでは、値を結合しようとしています:

val rdd: org.apache.spark.rdd.RDD[((String), Double)] =
    sc.parallelize(List(
      (("a"), 1.0),
      (("a"), 3.0),
      (("a"), 2.0)
      ))

val reduceByKey = rdd.reduceByKey((a , b) => String.valueOf(a) + String.valueOf(b))

reduceByValue(a , 1,3,2) を含める必要がありますが、コンパイル時エラーが発生します:

Multiple markers at this line - type mismatch; found : String required: Double - type mismatch; found : String 
 required: Double

reduce 関数の型を決定するものは何ですか? 型変換できないの?

同じ結果を得るために使用できますgroupByKeyが、理解したいだけですreduceByKey

4

2 に答える 2

7

RDD[(K,V)]いいえ、 typeのrdd を指定すると、 typereduceByKeyの連想関数を使用し(V,V) => Vます。

値の型を別の任意の型に変更するリダクションを適用したい場合は、次を使用できますaggregateByKey

def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)

関数と 関数を使用するzeroValueと、reduceByKey が行うようにseqOp、関連付け関数combOpが の結果を最終結果に結合しながら、マップ側で折り畳みのような操作を提供します。seqOp署名からわかるように、コレクションの値は型ですがV、結果はaggregateByKey任意の型になりますU

上記の例に適用すると、次のように aggregateByKeyなります。

rdd.aggregateByKey("")({case (aggr , value) => aggr + String.valueOf(value)}, (aggr1, aggr2) => aggr1 + aggr2)
于 2014-12-17T21:35:53.703 に答える
1

コードの問題は、値の型が一致しないことです。RDD で値の型を変更した場合、reduceByKey で同じ出力を得ることができます。

val rdd: org.apache.spark.rdd.RDD[((String), String)] =
    sc.parallelize(List(
      ("a", "1.0"),
      ("a", "3.0"),
      ("a", "2.0")
      ))

    val reduceByKey = rdd.reduceByKey((a , b) => a.concat(b))

これが同じ例です。reduceByKey に渡す関数が Value (この場合は Double ) 型の 2 つのパラメーターを取り、同じ型の単一のパラメーターを返す限り、reduceByKey は機能します。

于 2015-01-28T08:18:54.133 に答える