現在、Spark 1.2.0 コネクタを備えた Apache Cassandra 2.1.2 クラスターを使用しています。いくつかの初期テストでは、spark-shell 内の Spark SQL コマンドを使用して、Cassandra テーブルからいくつかの行を選択する必要があります。
キースペースksでtabletestというテーブルを使用します。このテーブルには、たとえばid (bigint)とts (timestamp)が含まれています。
ここに私のスパークスクリプトがあります:
import com.datastax.spark.connector._
import org.apache.spark.sql.cassandra.CassandraSQLContext
val cc = new CassandraSQLContext(sc)
cc.setKeyspace("ks")
val rdd = cc.sql("SELECT id,ts FROM tabletest LIMIT 100")
rdd.toArray.foreach(println)
コマンドを使用してこのスクリプトを実行すると:
spark-shell -i myscript
行に ts セルの空の値が含まれるまで、すべて問題ありません。ts の値が空の行がある場合、spark が長い値 (8 バイト) を待機し、バイトを取得しないという事実に関連するいくつかの例外が発生しました。行を表示せずに行数を数えようとしても、同じ問題が発生しました。
15/01/29 15:21:35 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
com.datastax.driver.core.exceptions.InvalidTypeException: Invalid 64-bits long value, expecting 8 bytes but got 0
at com.datastax.driver.core.TypeCodec$LongCodec.deserializeNoBoxing(TypeCodec.java:452)
at com.datastax.driver.core.TypeCodec$DateCodec.deserialize(TypeCodec.java:826)
at com.datastax.driver.core.TypeCodec$DateCodec.deserialize(TypeCodec.java:748)
at com.datastax.driver.core.DataType.deserialize(DataType.java:606)
at com.datastax.spark.connector.AbstractGettableData$.get(AbstractGettableData.scala:88)
at org.apache.spark.sql.cassandra.CassandraSQLRow$$anonfun$fromJavaDriverRow$1.apply$mcVI$sp(CassandraSQLRow.scala:42)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.sql.cassandra.CassandraSQLRow$.fromJavaDriverRow(CassandraSQLRow.scala:41)
at org.apache.spark.sql.cassandra.CassandraSQLRow$CassandraSQLRowReader$.read(CassandraSQLRow.scala:49)
at org.apache.spark.sql.cassandra.CassandraSQLRow$CassandraSQLRowReader$.read(CassandraSQLRow.scala:46)
at com.datastax.spark.connector.rdd.CassandraRDD$$anonfun$13.apply(CassandraRDD.scala:378)
at com.datastax.spark.connector.rdd.CassandraRDD$$anonfun$13.apply(CassandraRDD.scala:378)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
at com.datastax.spark.connector.util.CountingIterator.next(CountingIterator.scala:13)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:366)
at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
このような空の値をどのように処理できますか?SQL クエリ内でいくつかの関数を使用して空の値をデフォルト値に置き換える必要がありますか?それとも、スクリプトでいくつかのメソッドまたはパラメーターを使用して、spark がそのような空の値を処理できるようにすることができますか?
ご協力いただきありがとうございます、
一番
ニコラス