2

寄木細工のファイルを読み取ってMemSQLテーブルに書き込むこのプログラムがあります。Sparkがファイルを正しく読み取っていることを次のように確認できます

df.printSchema()
df.show(5)

スキーマとデータを正しく出力します。

テーブルにクエリを実行すると、行のすべての NULL 値が取得されます。テーブルではすべてが NULL です。ここで何が問題なのかわかりません。

parquet ファイルを memsql に書き込むコード

package com.rb.scala

    import com.memsql.spark.context.MemSQLContext
    import java.sql.{ DriverManager, ResultSet, Connection, Timestamp }

    import org.apache.spark._
    import org.apache.spark.sql._
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.catalyst.expressions.RowOrdering

    import com.memsql.spark.connector._
    import com.memsql.spark.connector.OnDupKeyBehavior._
    import com.memsql.spark.connector.dataframe._
    import com.memsql.spark.connector.rdd._

    import scala.util.control.NonFatal
    import org.apache.log4j.Logger
    object MemSQLWriter {

    def main(arg: Array[String]) {

    var logger = Logger.getLogger(this.getClass())

    if (arg.length < 1) {
      logger.error("=> wrong parameters number")
      System.err.println("Usage: MainExample <directory containing the source files to be loaded to database > ")
      System.exit(1)
    }

    val jobName = "MemSQLWriter"
    val conf = new SparkConf().setAppName(jobName)
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val pathToFiles = arg(0)
    logger.info("=> jobName \"" + jobName + "\"")
    logger.info("=> pathToFiles \"" + pathToFiles + "\"")
    val dbHost = "xx.xx.xx.xx"
    val dbPort = 3306
    val dbName = "memsqlrdd_db"
    val user = "root"
    val password = ""
    val tableName = "target_table"
    val dbAddress = "jdbc:mysql://" + dbHost + ":" + dbPort
    val df = sqlContext.read.parquet("/projects/example/data/")
    val conn = DriverManager.getConnection(dbAddress, user, password)
    val stmt = conn.createStatement
    stmt.execute("CREATE DATABASE IF NOT EXISTS " + dbName)
    stmt.execute("USE " + dbName)
    stmt.execute("DROP TABLE IF EXISTS " + tableName)
    df.printSchema()
    df.show(5)
    var columnArr  = df.columns
    var createQuery:String = " CREATE TABLE "+tableName+" ("
    logger.info("=> no of columns : "+columnArr.length)
    for(column <- columnArr){
       createQuery += column
       createQuery += " VARCHAR(100),"
    }
    createQuery += " SHARD KEY ("+columnArr(0)+"))"
    logger.info("=> create table query "+createQuery)
    stmt.execute(createQuery)

    df.select().saveToMemSQL(dbName, tableName, dbHost, dbPort, user, password, upsertBatchSize = 1000, useKeylessShardedOptimization = true)
    stmt.close()
  }
}
4

1 に答える 1

2

SHARD キーを使用してテーブルを作成してから を設定useKeylessShardingOptimization = trueすると、未定義の動作が発生します。これをfalseに設定すると、うまくいくはずです。

また、何が何をするのかわかりdf.select().saveToMemSQL...ません。試してみてくださいdf.saveToMemSQL ...

確認するときは、SELECT * FROM table WHERE col IS NOT NULL LIMIT 10実際にすべてヌルがあるかどうかを確認するなどのことを行います。

PS:df.createMemSQLTableAsあなたが望むことをする もあります。

于 2015-10-21T18:10:32.353 に答える