9

Cassandra データベースがあり、そこから Apache Spark を介して SparkSQL を使用してデータを分析しました。次に、分析したデータを PostgreSQL に挿入します。PostgreSQL ドライバーを使用する以外に直接これを達成する方法はありますか (私は postREST と Driver を使用してそれを達成しましたsaveToCassandra()

4

4 に答える 4

2

Postgres copy api を使用して書き込むことができます。その方がはるかに高速です。次の 2 つの方法を参照してください。1 つは RDD を反復処理して、コピー API で保存できるバッファーを埋めます。注意が必要なのは、コピー API で使用される csv 形式の正しいステートメントを作成することだけです。

def saveToDB(rdd: RDD[Iterable[EventModel]]): Unit = {
        val sb = mutable.StringBuilder.newBuilder
        val now = System.currentTimeMillis()

        rdd.collect().foreach(itr => {
            itr.foreach(_.createCSV(sb, now).append("\n"))
        })

        copyIn("myTable",  new StringReader(sb.toString), "statement")
        sb.clear
    }


def copyIn(tableName: String, reader: java.io.Reader, columnStmt: String = "") = {
        val conn = connectionPool.getConnection()
        try {
            conn.unwrap(classOf[PGConnection]).getCopyAPI.copyIn(s"COPY $tableName $columnStmt FROM STDIN WITH CSV", reader)
        } catch {
            case se: SQLException => logWarning(se.getMessage)
            case t: Throwable => logWarning(t.getMessage)
        } finally {
            conn.close()
        }
    }
于 2015-05-26T23:00:57.737 に答える