0

SparkR SparkDataFrame で Java メソッドを使用して Cassandra にデータを書き込めるようにしたいと考えています。

sparklyrたとえば、拡張機能を使用すると、次のようなことができます。

sparklyr::invoke(sparklyr::spark_dataframe(spark_tbl), "write") %>>% 
sparklyr::invoke("format", "org.apache.spark.sql.cassandra") %>>% 
sparklyr::invoke("option", "keyspace", keyspace) %>>% 
sparklyr::invoke("option", "table", table) %>>%
sparklyr::invoke("mode", "append") %>% 
sparklyr::invoke("save")

これにより、毎秒約 20k 行の書き込み速度を実現できます。

ただし、私の使用例ではSparkR::spark.lapply、Cassandra テーブルのサブセットをローカルで収集し、スクリプトを実行してデータを書き戻すことができるようにしたいと考えています。私が使用しようとしたすべての方法sparklyrは、最終的にシングルスレッドになったため、実際にはスパークをまったく使用していません。

ではSparkR、次のようなものを使用してデータを書き込むことができます。

SparkR::saveDF(SparkR::as.DataFrame(dt_local), "",
               source = "org.apache.spark.sql.cassandra",
               table = table,
               keyspace = keyspace,
               mode = "append")

ただし、この場合、書き込み速度は毎秒 2k 行に近くなります。より高い書き込み速度を達成するためSparkR::sparkR.callJMethodに、ケースと同じチェーンを呼び出すことができると思いますが、最初に、できなかったハンドルを持つものをシリアル化する必要があります。これは可能ですか?sparklyrSparkDataFramejobj

可能であれば、これを達成するための他の方法も受け入れます。sparkRと の間を移動しようとして調査しましたsparklyrが、バックエンドが違いすぎるようです (私の知る限り)。また、ここから、まだ類似のlapplyものはないと信じてsparklyrいます。

助けてくれてありがとう

4

1 に答える 1