0

からイベントを取得しKafkaて保存してCassandraいます。次のようなテーブルの列を作成するためのjsonフィールドを含む解析:eventID, sessionID, timestamp, userIDCassandra

cassandra@cqlsh> CREATE TABLE mydata.events (
   ...     "event_date" date,
   ...     "eventID" text,
   ...     "userID" text,
   ...     timestamp timeuuid,
   ...     "sessionID" text,
   ...     "fullJson" text,
   ...     PRIMARY KEY ("event_date", timestamp, "sessionID")

そしてコードで:

case class cassandraFormat(
                       eventID: String, 
                       sessionID: String,
                       timeuuid: UUID, // timestamp as timeuuid
                       userID: String,
                       event_date: LocalDate, // YYYY-MM-dd format
                       fullJson: String // full json from Kafka
                     )

timestampとして列を追加する必要がありますtimeuuid。から解析しているのでjson、ヘッダーからすべての値を抽出し、この方法で列を作成しました。

 val allJson = rdd.
            map(x => {
              implicit val formats: DefaultFormats.type = org.json4s.DefaultFormats
              //use serialization default to format a Map to JSON
              (x, Serialization.write(x))
            }).
            filter(x => x._1 isDefinedAt "header").
            map(x => (x._1("header"), x._2)).
            filter(x => (x._1 isDefinedAt "userID") &&
              (x._1 isDefinedAt "eventID") &&
              (x._1 isDefinedAt "sessionID") &&
              (x._1 isDefinedAt "timestamp").
            map(x => cassFormat(x._1("eventID").toString,
              x._1("sessionID").toString,
              com.datastax.driver.core.utils.UUIDs.startOf(x._1("timestamp").toString.toLong),
              x._1("userID").toString,
              com.datastax.driver.core.LocalDate.fromMillisSinceEpoch(x._1("timestamp").toString.toLong),
              x._2))

この部分:

com.datastax.driver.core.utils.UUIDs.startOf(x._1("timestamp").toString.toLong)

エラーが発生しています

java.lang.NumberFormatException: 入力文字列の場合: "2019-05-09T09:00:52.553+0000" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)

試してみました: java.util.UUID.fromString(x._1("timestamp").toString、同じエラーも生成します。適切にキャスト/変換timestampし、 spark ジョブ経由timeuuidで挿入する方法Cassandra

4

3 に答える 3

0

数値ではない文字列があり、 を使用して数値に変換しようとしていtoLongます。したがって例外です。

thisを見ると、次のメソッドを使用してタイムスタンプに基づいて UUID を取得できるようです。

public static UUID getTimeUUID(long when)

文字列をDateTimeまたはに解析しInstant、そのDateTime/Instantのミリ秒を渡す必要がありますgetTimeUUID

于 2019-05-09T13:52:26.243 に答える