0

SparkStreaming からのデータのストリーミングがあります。私は処理する必要があり、最終的にデータを Cassandra に保存したいと考えています。そのため、以前は SparkCassandra コネクタを使用しようとしていました。ただし、ワーカーで SparkStreaming Context オブジェクトにアクセスすることはできません。そのため、別の cassandra-scala ドライバーを使用する必要があります。したがって、私はファントムになってしまいました。さて、私の質問は、cassnandra で列ファミリーを既に定義していることです。では、scala から選択および更新クエリを実行するにはどうすればよいですか。

これらのドキュメントlink1に従いましたが、クライアント (scala コード) 側でテーブル定義を指定する必要がある理由がわかりません。Keyspaceを与えて、ClusterPointsそれで終わりにすることができないのはなぜですかColumnFamily

     object CustomConnector {
       val hosts = Seq("IP1", "IP2")
       val Connector = ContactPoints(hosts).keySpace("KEYSPACE_NAME")
    }

      realTimeAgg.foreachRDD{ x => if (x.toLocalIterator.nonEmpty) {
                                x.foreachPartition {
                         How to achieve select/insert in Cassandra table here using phantom
    }
4

1 に答える 1

0

ファントムを使用してこれを行うことはまだできません。これをphantom-spark可能にするために積極的に取り組んでいますが、現段階ではまだ数か月先です。

当面は、spark cassandra コネクタに依存し、タイプセーフでない API を使用してこれを実現する必要があります。これはより残念な設定ですが、近い将来に解決される予定です。

于 2016-09-15T20:25:33.190 に答える