SparkR SparkDataFrame で Java メソッドを使用して Cassandra にデータを書き込めるようにしたいと考えています。
sparklyr
たとえば、拡張機能を使用すると、次のようなことができます。
sparklyr::invoke(sparklyr::spark_dataframe(spark_tbl), "write") %>>%
sparklyr::invoke("format", "org.apache.spark.sql.cassandra") %>>%
sparklyr::invoke("option", "keyspace", keyspace) %>>%
sparklyr::invoke("option", "table", table) %>>%
sparklyr::invoke("mode", "append") %>%
sparklyr::invoke("save")
これにより、毎秒約 20k 行の書き込み速度を実現できます。
ただし、私の使用例ではSparkR::spark.lapply
、Cassandra テーブルのサブセットをローカルで収集し、スクリプトを実行してデータを書き戻すことができるようにしたいと考えています。私が使用しようとしたすべての方法sparklyr
は、最終的にシングルスレッドになったため、実際にはスパークをまったく使用していません。
ではSparkR
、次のようなものを使用してデータを書き込むことができます。
SparkR::saveDF(SparkR::as.DataFrame(dt_local), "",
source = "org.apache.spark.sql.cassandra",
table = table,
keyspace = keyspace,
mode = "append")
ただし、この場合、書き込み速度は毎秒 2k 行に近くなります。より高い書き込み速度を達成するためSparkR::sparkR.callJMethod
に、ケースと同じチェーンを呼び出すことができると思いますが、最初に、できなかったハンドルを持つものをシリアル化する必要があります。これは可能ですか?sparklyr
SparkDataFrame
jobj
可能であれば、これを達成するための他の方法も受け入れます。sparkR
と の間を移動しようとして調査しましたsparklyr
が、バックエンドが違いすぎるようです (私の知る限り)。また、ここから、まだ類似のlapply
ものはないと信じてsparklyr
います。
助けてくれてありがとう