戻ろうとしRDD[(String,String,String)]
ていますが、 を使用してそれを行うことができませんflatMap
。試し(tweetId, tweetBody, gender)
てみ(tweetId, tweetBody, gender)
ましたが、タイプの不一致のエラーが表示さRDD[(String, String, String)]
れますflatMap
override def transform(sqlContext: SQLContext, rdd: RDD[Array[Byte]], config: UserTransformConfig, logger: PhaseLogger): DataFrame = {
val idColumnName = config.getConfigString("column_name").getOrElse("id")
val bodyColumnName = config.getConfigString("column_name").getOrElse("body")
val genderColumnName = config.getConfigString("column_name").getOrElse("gender")
// convert each input element to a JsonValue
val jsonRDD = rdd.map(r => byteUtils.bytesToUTF8String(r))
val hashtagsRDD: RDD[(String,String, String)] = jsonRDD.mapPartitions(r => {
// register jackson mapper (this needs to be instantiated per partition
// since it is not serializable)
val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
r.flatMap(tweet => tweet match {
case _ :: tweet =>
val rootNode = mapper.readTree(tweet)
val tweetId = rootNode.path("id").asText.split(":")(2)
val tweetBody = rootNode.path("body").asText
val tweetVector = new HashingTF().transform(tweetBody.split(" "))
val result =genderModel.predict(tweetVector)
val gender = if(result == 1.0){"Male"}else{"Female"}
(tweetId, tweetBody, gender)
// Array(1).map(x => (tweetId, tweetBody, gender))
})
})
val rowRDD: RDD[Row] = hashtagsRDD.map(x => Row(x._1,x._2,x._3))
val schema = StructType(Array(StructField(idColumnName,StringType, true),StructField(bodyColumnName, StringType, true),StructField(genderColumnName,StringType, true)))
sqlContext.createDataFrame(rowRDD, schema)
}
}