SparkStreaming からのデータのストリーミングがあります。私は処理する必要があり、最終的にデータを Cassandra に保存したいと考えています。そのため、以前は SparkCassandra コネクタを使用しようとしていました。ただし、ワーカーで SparkStreaming Context オブジェクトにアクセスすることはできません。そのため、別の cassandra-scala ドライバーを使用する必要があります。したがって、私はファントムになってしまいました。さて、私の質問は、cassnandra で列ファミリーを既に定義していることです。では、scala から選択および更新クエリを実行するにはどうすればよいですか。
これらのドキュメントlink1に従いましたが、クライアント (scala コード) 側でテーブル定義を指定する必要がある理由がわかりません。Keyspace
を与えて、ClusterPoints
それで終わりにすることができないのはなぜですかColumnFamily
。
object CustomConnector {
val hosts = Seq("IP1", "IP2")
val Connector = ContactPoints(hosts).keySpace("KEYSPACE_NAME")
}
realTimeAgg.foreachRDD{ x => if (x.toLocalIterator.nonEmpty) {
x.foreachPartition {
How to achieve select/insert in Cassandra table here using phantom
}