5

キースペースのテーブルを更新しているときに、scala の spark cassandra コネクタの問題に直面しています

ここに私のコードがあります

val query = "UPDATE " + COLUMN_FAMILY_UNIQUE_TRAFFIC + DATA_SET_DEVICE +
                        " SET a= a + " + b + " WHERE x=" +
                        x + " AND y=" + y +
                        " AND z=" + x

println(query)

val KeySpace    = new CassandraSQLContext(sparkContext)
KeySpace.setKeyspace(KEYSPACE)

hourUniqueKeySpace.sql(query)

このコードを実行すると、このようなエラーが発生します

Exception in thread "main" java.lang.RuntimeException: [1.1] failure: ``insert'' expected but identifier UPDATE found

なぜこれが起こっているのですか?どうすればこれを修正できますか?

4

2 に答える 2

6

カウンター列を持つテーブルの UPDATE は、spark-cassandra-connector を介して実行できます。モード「追加」(または必要に応じてSaveMode .Append)で保存するDataFramesおよびDataFrameWriterメソッドを使用する必要があります。コードDataFrameWriter.scalaを確認してください。

たとえば、次の表があるとします。

cqlsh:test> SELECT * FROM name_counter ;

 name    | surname | count
---------+---------+-------
    John |   Smith |   100
   Zhang |     Wei |  1000
 Angelos |   Papas |    10

コードは次のようになります。

val updateRdd = sc.parallelize(Seq(Row("John",    "Smith", 1L),
                                   Row("Zhang",   "Wei",   2L),
                                   Row("Angelos", "Papas", 3L)))

val tblStruct = new StructType(
    Array(StructField("name",    StringType, nullable = false),
          StructField("surname", StringType, nullable = false),
          StructField("count",   LongType,   nullable = false)))

val updateDf  = sqlContext.createDataFrame(updateRdd, tblStruct)

updateDf.write.format("org.apache.spark.sql.cassandra")
.options(Map("keyspace" -> "test", "table" -> "name_counter"))
.mode("append")
.save()

更新後:

 name    | surname | count
---------+---------+-------
    John |   Smith |   101
   Zhang |     Wei |  1002
 Angelos |   Papas |    13

DataFrame 変換は、RDD を DataFrame に暗黙的に変換しimport sqlContext.implicits._.toDF().

このおもちゃのアプリケーションの完全なコードを確認してください: https://github.com/kyrsideris/SparkUpdateCassandra/tree/master

ここではバージョンが非常に重要であるため、上記は Scala 2.11.7、Spark 1.5.1、spark-cassandra-connector 1.5.0-RC1-s_2.11、Cassandra 3.0.5 に適用されます。DataFrameWriter は since と指定され@Experimentalてい@since 1.4.0ます。

于 2016-04-21T11:29:11.217 に答える
3

SPARK コネクタを介してネイティブに更新することはできないと思います。ドキュメントを参照してください:

「Spark Cassandra コネクタのデフォルトの動作は、cassandra テーブルに挿入されたときにコレクションを上書きすることです。この動作をオーバーライドするには、カスタム マッパーを指定して、コレクションをどのように処理するかを指示できます。」

したがって、既存のキーを使用して新しいレコードを実際に INSERT する必要があります。

于 2015-08-06T01:40:08.653 に答える