2

Datastax Enterprise 4.5 を使用しています。私は設定を正しく行ったことを願っています.datastaxのWebサイトで説明されているようにしました。Windows サービスを使用して Cassandra DB に書き込むことができます。これは機能しますが、where 関数を使用して Spark でクエリを実行することはできません。

「./dse cassandra -k -t」(/bin フォルダー内) を使用して Cassandra ノード (テスト用に 1 つだけあります) を開始するので、hadoop と spark の両方が実行されます。問題なく Cassandra に書き込むことができます。

そのため、'where' が RowKey でない場合、Cassandra クエリで 'where' 句を使用することはできません。そのため、Spark/Shark を使用する必要があります。必要なすべてのクエリをサメ (./dse shark) で開始して使用できますが、Scala または Java でスタンドアロン プログラムを作成する必要があります。

だから私はこのリンクを試しました: https://github.com/datastax/spark-cassandra-connector

そして、次のような単純なステートメントをクエリできます。

val conf = new SparkConf(true)
  .set("spark.cassandra.connection.host", "MY_IP")
  .setMaster("spark://MY_IP:7077")
  .setAppName("SparkTest")

// Connect to the Spark cluster:
lazy val sc = new SparkContext(conf)

val rdd = sc.cassandraTable("keyspace", "tablename")
println(rdd.first)

これはうまくいきますが、より多くの行または数を要求すると:

println(rdd.count)
rdd.toArray.foreach(println)

次に、この例外が発生します。

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: All masters are unresponsive! Giving up.
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Javaでこれを試すと、同じ問題が発生します。誰もこの問題を知っていますか?DB 構成が正しいかどうか、または scala/Javaprogram が正しく機能するかどうかはわかりません。一部のポートがブロックされている可能性がありますが、7077 と 4040 は開いています。

補足: Cassandra DB で spark を開始すると、次のようなクエリを実行できます。

sc.cassandraTable("test","words").select("word").toArray.foreach(println) 

しかし、次のような「where」句を使用すると:

sc.cassandraTable("test","words").select("word").where("word = ?","foo").toArray.foreach(println)

私はこの例外を受け取ります:

java.io.IOException: Exception during query execution: SELECT "word" FROM "test"."words" WHERE token("word") > 0 AND word = ? ALLOW FILTERING

理由はありますか?Spark で where 句を使用できると思いましたか?

ありがとうございました!

4

2 に答える 2