0

という DataFramelastTailを使用すると、次のように繰り返すことができます。

import scalikejdbc._
// ... 
// Do Kafka Streaming to create DataFrame lastTail
// ...

lastTail.printSchema

lastTail.foreachPartition(iter => {

// open database connection from connection pool
// with scalikeJDBC (to PostgreSQL) 

  while(iter.hasNext) {
    val item = iter.next()
    println("****")
    println(item.getClass)
    println(item.getAs("fileGid"))
    println("Schema: "+item.schema)
    println("String: "+item.toString())
    println("Seqnce: "+item.toSeq)

    // convert this item into an XXX format (like JSON)
    // write row to DB in the selected format
  }
})

これは「次のようなもの」を出力します(編集あり): root |-- fileGid: string (nullable = true) |-- eventStruct: struct (nullable = false) | |-- eventIndex: integer (nullable = true) | |-- eventGid: string (nullable = true) | |-- eventType: string (nullable = true) |-- revisionStruct: struct (nullable = false) | |-- eventIndex: integer (nullable = true) | |-- eventGid: string (nullable = true) | |-- eventType: string (nullable = true)

そして(反復項目が1つだけ - 編集されていますが、うまくいけば十分な構文でもあります)

**** class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema 12345 Schema: StructType(StructField(fileGid,StringType,true), StructField(eventStruct,StructType(StructField(eventIndex,IntegerType,true), StructField(eventGid,StringType,true), StructField(eventType,StringType,true)), StructField(revisionStruct,StructType(StructField(eventIndex,IntegerType,true), StructField(eventGid,StringType,true), StructField(eventType,StringType,true), StructField(editIndex,IntegerType,true)),false)) String: [12345,[1,4,edit],[1,4,revision]] Seqnce: WrappedArray(12345, [1,4,edit], [1,4,revision])

注: https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerPartition.scalaval metric = iter.sumのような部分を実行していますが、代わりに DataFrames を使用しています。http://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuningにある「foreachRDD を使用するためのデザイン パターン」にも従っています 。

この org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema を変換するにはどうすればよいですか ( https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/を参照) spark/sql/catalyst/expressions/rows.scala ) 反復項目を、PostgreSQL に簡単に書き込めるもの (JSON または ...? - 私はオープンです) に変換します。(JSON でない場合は、別の時点で使用するためにこの値を DataFrame に読み込む方法を提案してください。)

4

1 に答える 1