0

戻ろうとしRDD[(String,String,String)]ていますが、 を使用してそれを行うことができませんflatMap。試し(tweetId, tweetBody, gender)てみ(tweetId, tweetBody, gender)ましたが、タイプの不一致のエラーが表示さRDD[(String, String, String)]れますflatMap

override def transform(sqlContext: SQLContext, rdd: RDD[Array[Byte]], config: UserTransformConfig, logger: PhaseLogger): DataFrame = {
    val idColumnName = config.getConfigString("column_name").getOrElse("id")
    val bodyColumnName = config.getConfigString("column_name").getOrElse("body")
    val genderColumnName = config.getConfigString("column_name").getOrElse("gender")

    // convert each input element to a JsonValue
    val jsonRDD = rdd.map(r => byteUtils.bytesToUTF8String(r))

    val hashtagsRDD: RDD[(String,String, String)] = jsonRDD.mapPartitions(r => {
      // register jackson mapper (this needs to be instantiated per partition
      // since it is not serializable)
      val mapper = new ObjectMapper()
      mapper.registerModule(DefaultScalaModule)

      r.flatMap(tweet => tweet match {
        case _ :: tweet =>
        val rootNode = mapper.readTree(tweet)
        val tweetId = rootNode.path("id").asText.split(":")(2)
        val tweetBody = rootNode.path("body").asText
        val tweetVector =  new HashingTF().transform(tweetBody.split(" "))
        val result =genderModel.predict(tweetVector)
        val gender = if(result == 1.0){"Male"}else{"Female"}

        (tweetId, tweetBody, gender)
       // Array(1).map(x => (tweetId, tweetBody, gender))

      })

    })

    val rowRDD: RDD[Row] = hashtagsRDD.map(x => Row(x._1,x._2,x._3))
    val schema = StructType(Array(StructField(idColumnName,StringType, true),StructField(bodyColumnName, StringType, true),StructField(genderColumnName,StringType, true)))
    sqlContext.createDataFrame(rowRDD, schema)
  }
}
4

1 に答える 1

0

mapの代わりに使用してみてくださいflatMapflatMapパラメータ関数の結果の型がコレクションまたはRDD

つまりflatMap、現在のコレクションのすべての要素が 0 個以上の要素にマップされている場合に使用されます。Whilemapは、現在のコレクションのすべての要素が正確に 1 つの要素にマップされている場合に使用されます。

map withは、関数型の記号と記号をA => B交換します。つまり、に変換します。ABRDD[A]RDD[B]

flatMapモナド型ではmapそのflattenように読むことができます。たとえば、 and があり、パラメーター関数はsimple の結果の型で あり、その出現のペアは単に経由するように単純化できますRDD[A]A => RDD[B]mapRDD[RDD[B]]RDD[B]flatten

正常にコンパイルされたコードの例を次に示します。

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.types.{StringType, StructField, StructType}

class UserTransformConfig {
  def getConfigString(name: String): Option[String] = ???
}

class PhaseLogger
object byteUtils {
  def bytesToUTF8String(r: Array[Byte]): String = ???
}

class HashingTF {
  def transform(strs: Array[String]): Array[Double] = ???
}

object genderModel {
  def predict(v: Array[Double]): Double = ???
}

def transform(sqlContext: SQLContext, rdd: RDD[Array[Byte]], config: UserTransformConfig, logger: PhaseLogger): DataFrame = {
  val idColumnName = config.getConfigString("column_name").getOrElse("id")
  val bodyColumnName = config.getConfigString("column_name").getOrElse("body")
  val genderColumnName = config.getConfigString("column_name").getOrElse("gender")

  // convert each input element to a JsonValue
  val jsonRDD = rdd.map(r => byteUtils.bytesToUTF8String(r))

  val hashtagsRDD: RDD[(String, String, String)] = jsonRDD.mapPartitions(r => {
    // register jackson mapper (this needs to be instantiated per partition
    // since it is not serializable)
    val mapper = new ObjectMapper
    mapper.registerModule(DefaultScalaModule)

    r.map { tweet =>
      val rootNode = mapper.readTree(tweet)
      val tweetId = rootNode.path("id").asText.split(":")(2)
      val tweetBody = rootNode.path("body").asText
      val tweetVector = new HashingTF().transform(tweetBody.split(" "))
      val result = genderModel.predict(tweetVector)
      val gender = if (result == 1.0) {"Male"} else {"Female"}

      (tweetId, tweetBody, gender)

    }
  })

  val rowRDD: RDD[Row] = hashtagsRDD.map(x => Row(x._1, x._2, x._3))
  val schema = StructType(Array(StructField(idColumnName, StringType, true), StructField(bodyColumnName, StringType, true), StructField(genderColumnName, StringType, true)))
  sqlContext.createDataFrame(rowRDD, schema)
}

最小限の例を提供しなかったため、私の想像力からどれだけ引き出す必要があるかを確認してください。一般的に、このような質問は答える価値がありません

于 2015-11-16T14:58:45.703 に答える