0

私はspark cassandraコネクタ1.2.3でspark 1.2を使用しています.テーブルの一部の行を更新しようとしています:

例:

CREATE TABLE myTable ( 
a text, 
b text, 
c text, 
date timestamp, 
d text, 
e text static, 
f text static, 
PRIMARY KEY ((a, b, c), date, d) 
) WITH CLUSTERING ORDER BY (date ASC, d ASC)

val interactions = sc.cassandraTable[(String, String, String, DateTime, String, String)]("keySpace", "myTable"). 
select("a","b","c","date", "d", "e","f") 
val empty = interactions.filter(r => r._6 == null).cache() 
empty.count()

「e」のnullを含む行の数を数え、「b」の値でそれらを置き換えます

 val update_inter = empty.map( r =>  (r._1,r._2, r._3, r._4, r._5, r._2)) 
 update_inter.saveToCassandra("keySpace", "myTable", SomeColumns("a","b","c","date", "d", "e", "f"))

これは cqlsh でチェックインすると機能しますが、spark cassandra で同じ行を要求すると値 null が返されます。

これは spark cassandra コネクタのバグですか? ご協力いただきありがとうございます。

4

2 に答える 2

0

挿入/更新が発生すると、行をその場で上書きするのではなく、Cassandra は挿入または更新されたデータの新しいタイムスタンプ付きバージョンを別の SSTable に書き込みます。

Spark ジョブが既存の行を更新せずに新しい行を書き込んでいるか、SSTable がまだディスクに変更を書き込んでいません。結果を新しいテーブルに書き込む場合、null 'e' 列の数はゼロになります。

nodetool flush コマンドを試して、これを読んでください: Cassandra Compaction

于 2015-09-28T16:20:02.843 に答える