Cassandra データベースがあり、そこから Apache Spark を介して SparkSQL を使用してデータを分析しました。次に、分析したデータを PostgreSQL に挿入します。PostgreSQL ドライバーを使用する以外に直接これを達成する方法はありますか (私は postREST と Driver を使用してそれを達成しましたsaveToCassandra()
。
7500 次
4 に答える
2
Postgres copy api を使用して書き込むことができます。その方がはるかに高速です。次の 2 つの方法を参照してください。1 つは RDD を反復処理して、コピー API で保存できるバッファーを埋めます。注意が必要なのは、コピー API で使用される csv 形式の正しいステートメントを作成することだけです。
def saveToDB(rdd: RDD[Iterable[EventModel]]): Unit = {
val sb = mutable.StringBuilder.newBuilder
val now = System.currentTimeMillis()
rdd.collect().foreach(itr => {
itr.foreach(_.createCSV(sb, now).append("\n"))
})
copyIn("myTable", new StringReader(sb.toString), "statement")
sb.clear
}
def copyIn(tableName: String, reader: java.io.Reader, columnStmt: String = "") = {
val conn = connectionPool.getConnection()
try {
conn.unwrap(classOf[PGConnection]).getCopyAPI.copyIn(s"COPY $tableName $columnStmt FROM STDIN WITH CSV", reader)
} catch {
case se: SQLException => logWarning(se.getMessage)
case t: Throwable => logWarning(t.getMessage)
} finally {
conn.close()
}
}
于 2015-05-26T23:00:57.737 に答える