という DataFramelastTail
を使用すると、次のように繰り返すことができます。
import scalikejdbc._
// ...
// Do Kafka Streaming to create DataFrame lastTail
// ...
lastTail.printSchema
lastTail.foreachPartition(iter => {
// open database connection from connection pool
// with scalikeJDBC (to PostgreSQL)
while(iter.hasNext) {
val item = iter.next()
println("****")
println(item.getClass)
println(item.getAs("fileGid"))
println("Schema: "+item.schema)
println("String: "+item.toString())
println("Seqnce: "+item.toSeq)
// convert this item into an XXX format (like JSON)
// write row to DB in the selected format
}
})
これは「次のようなもの」を出力します(編集あり):
root
|-- fileGid: string (nullable = true)
|-- eventStruct: struct (nullable = false)
| |-- eventIndex: integer (nullable = true)
| |-- eventGid: string (nullable = true)
| |-- eventType: string (nullable = true)
|-- revisionStruct: struct (nullable = false)
| |-- eventIndex: integer (nullable = true)
| |-- eventGid: string (nullable = true)
| |-- eventType: string (nullable = true)
そして(反復項目が1つだけ - 編集されていますが、うまくいけば十分な構文でもあります)
****
class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
12345
Schema: StructType(StructField(fileGid,StringType,true), StructField(eventStruct,StructType(StructField(eventIndex,IntegerType,true), StructField(eventGid,StringType,true), StructField(eventType,StringType,true)), StructField(revisionStruct,StructType(StructField(eventIndex,IntegerType,true), StructField(eventGid,StringType,true), StructField(eventType,StringType,true), StructField(editIndex,IntegerType,true)),false))
String: [12345,[1,4,edit],[1,4,revision]]
Seqnce: WrappedArray(12345, [1,4,edit], [1,4,revision])
注: https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerPartition.scalaval metric = iter.sum
のような部分を実行していますが、代わりに DataFrames を使用しています。http://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuningにある「foreachRDD を使用するためのデザイン パターン」にも従っています 。
この org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema を変換するにはどうすればよいですか ( https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/を参照) spark/sql/catalyst/expressions/rows.scala ) 反復項目を、PostgreSQL に簡単に書き込めるもの (JSON または ...? - 私はオープンです) に変換します。(JSON でない場合は、別の時点で使用するためにこの値を DataFrame に読み込む方法を提案してください。)