カスタム ケース クラスを使用して、Spark (1.4.0) を使用して Cassandra (2.1.6) に書き込むのに苦労しています。spark-cassandra-connector 1.4.0-M1
これまでのところ、DataStaxと次のケース クラスを使用してこれを試しました。
case class Event(event_id: String, event_name: String, event_url: String, time: Option[Long])
[...]
case class RsvpResponse(event: Event, group: Group, guests: Long, member: Member, mtime: Long, response: String, rsvp_id: Long, venue: Option[Venue])
これを機能させるために、次のコンバーターも実装しました。
implicit object EventToUDTValueConverter extends TypeConverter[UDTValue] {
def targetTypeTag = typeTag[UDTValue]
def convertPF = {
case e: Event => UDTValue.fromMap(toMap(e)) // toMap just transforms the case class into a Map[String, Any]
}
}
TypeConverter.registerConverter(EventToUDTValueConverter)
コンバーターを手動で検索すると、それを使用して のインスタンスを に変換できますEvent
が、関連するオブジェクトUDTValue
を使用sc.saveToCassandra
して のインスタンスを渡すとRsvpResponse
、次のエラーが発生します。
15/06/23 23:56:29 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
com.datastax.spark.connector.types.TypeConversionException: Cannot convert object Event(EVENT9136830076436652815,First event,http://www.meetup.com/first-event,Some(1435100185774)) of type class model.Event to com.datastax.spark.connector.UDTValue.
at com.datastax.spark.connector.types.TypeConverter$$anonfun$convert$1.apply(TypeConverter.scala:42)
at com.datastax.spark.connector.types.UserDefinedType$$anon$1$$anonfun$convertPF$1.applyOrElse(UserDefinedType.scala:33)
at com.datastax.spark.connector.types.TypeConverter$class.convert(TypeConverter.scala:40)
at com.datastax.spark.connector.types.UserDefinedType$$anon$1.convert(UserDefinedType.scala:31)
at com.datastax.spark.connector.writer.DefaultRowWriter$$anonfun$readColumnValues$2.apply(DefaultRowWriter.scala:46)
at com.datastax.spark.connector.writer.DefaultRowWriter$$anonfun$readColumnValues$2.apply(DefaultRowWriter.scala:43)
UDTValue
コネクタ ライブラリが内部で処理する方法が原因で、コンバータが呼び出されることさえないようです。ただし、上記のソリューションは、Cassandra テーブル (ユーザー定義型を含む) からのデータの読み取りには機能します。コネクタ ドキュメントに基づいて、ネストされたケース クラスをcom.datastax.spark.connector.UDTValue
型に直接置き換えました。これにより、説明されている問題は修正されますが、データの読み取りが中断されます。データの読み取りと書き込みのために 2 つの別々のモデルを定義するつもりだとは想像できません。それとも、ここで明らかな何かが欠けていますか?