Kafka トピックから json を読み取るスナッピー ストリーミング テーブルがあります。いくつかの作業の後、これが機能するようになりましたが、オブジェクトからストリーミング テーブルに java.sql.Timestamp
値をマップしようとしたときに問題が発生しました。SensorData
org.apache.spark.sql.catalyst.CatalystTypeConverters
このメソッドの 318 行目でエラーが発生していました。
private object StringConverter extends CatalystTypeConverter[Any, String, UTF8String] {
override def toCatalystImpl(scalaValue: Any): UTF8String = scalaValue match {
case str: String => UTF8String.fromString(str)
case utf8: UTF8String => utf8
}
override def toScala(catalystValue: UTF8String): String =
if (catalystValue == null) null else catalystValue.toString
override def toScalaImpl(row: InternalRow, column: Int): String =
row.getUTF8String(column).toString
}
デバッグを実行したところ、コードは明らかにここで文字列値を想定していましたが、私の sensorData オブジェクト (およびストリーミング テーブル) センサーと収集時間はタイムスタンプです。したがって、値を変換できないと不平を言っていました。
以下は、Kafka から受信した json メッセージの値をマップするために使用する SensorData クラスです。次に、カスタム コンバーターで、これらの値をメソッド Seq[Row]
内の my にマップします。toRows(...)
class SensorData {
var sensor_id: String = _
var metric: String = _
var collection_time: java.sql.Timestamp = _
var sensor_time: java.sql.Timestamp = _
// var collection_time: String = _
// var sensor_time: String = _
var value: String = _
var year_num: Int = _
var month_num: Int = _
var day_num: Int = _
var hour_num: Int = _
}
ここに私のストリーミングテーブルがあります:
snsc.sql(s"CREATE STREAM TABLE sensor_data_stream if not exists " +
"(sensor_id string, " +
"metric string, " +
"collection_time TIMESTAMP, " +
"value VARCHAR(128), " +
"sensor_time TIMESTAMP, " +
"year_num integer, " +
"month_num integer, " +
"day_num integer, " +
"hour_num integer " +
") " +
"using kafka_stream " +
"options (storagelevel 'MEMORY_AND_DISK_SER_2', " +
"rowConverter 'org.me.streaming.sensor.test.converter.SensorConverter', " +
"zkQuorum 'localhost:2181', " +
" groupId 'sensorConsumer', topics 'sensorTest:01')")
この問題を回避するために、SensorData オブジェクトのデータ型を文字列に変更し、ストリーミング テーブルの列のデータ型を次のように変更しました。
"collection_time string, " +
"sensor_time string, " +
その結果、このデータ型の変更を行った後、Kafka からターゲット列テーブルにデータを正常にストリーミングすることができました。
私の質問...私はSnappyData/ストリーミングの世界にかなり慣れていないので、これがバグ(既知/未知)であるかどうか、またはタイムスタンプデータ型をストリーミングテーブルにバインドするよりエレガントな方法があるかどうかを知りたいですか?
******応答ごとに更新********
ここに私の行コンバーターがあります:
class SensorConverter extends StreamToRowsConverter with Serializable {
override def toRows(message: Any): Seq[Row] = {
val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
val sensor = mapper.readValue(message.toString(), classOf[SensorData])
Seq(Row.fromSeq(Seq(
sensor.sensor_id,
sensor.metric,
sensor.collection_time,
sensor.value,
sensor.sensor_time,
sensor.year_num,
sensor.month_num,
sensor.day_num,
sensor.hour_num)))
}
}
私は最初に Java オブジェクトを変換しようとしましたが、それをデコードする際に問題が発生していました (おそらく、私が立ち上げたときに API の知識が不足していたためだと思われます)。最終的に、JSON 文字列を Kafka に渡すだけになりました。
@ https://github.com/SnappyDataInc/snappy-poc/blob/master/src/main/scala/io/snappydata/adanalytics/Codec.scala で提供された例で、受信するタイムスタンプ値を適切にラップしていないことがわかりますSeq[Row] を構築するときの java.sql.Timestamp 呼び出しで (これは長々と入ってきます)。それが私の問題を解決するかどうかを確認するために、それを試してみます。