1

私の人生では、なぜこれがシリアル化できないのか理解できません。私はspark-shell(貼り付けモード)で以下を実行しています。Spark 1.3.1、Cassandra 2.1.6、Scala 2.10 で実行しています

import org.apache.spark._
import com.datastax.spark.connector._

val driverPort = 7077
val driverHost = "localhost"
val conf = new SparkConf(true)
  .set("spark.driver.port", driverPort.toString)
  .set("spark.driver.host", driverHost)
  .set("spark.logConf", "true")
  .set("spark.driver.allowMultipleContexts", "true")
  .set("spark.cassandra.connection.host", "localhost")
val sc = new SparkContext("local[*]", "test", conf)
case class Test(id: String, creationdate: String) extends Serializable

sc.parallelize(Seq(Test("98429740-2933-11e5-8e68-f7cca436f8bf", "2015-07-13T07:48:47.924Z")))
  .saveToCassandra("testks", "test", SomeColumns("id", "creationdate"))

sc.cassandraTable[Test]("testks", "test").toArray
sc.stop()

私はこれでspark-shellを始めました:

./spark-shell -Ddriver-class-path=/usr/local/spark/libs/* -Dsun.io.serialization.extendedDebugInfo=true

-Dsun.io.serialization.extendedDebugInfo=true プロパティを含めても違いは見られませんでした。

完全なエラー (編集):

java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.serializer.SerializationDebugger$ObjectStreamClassMethods$.getObjFieldValues$extension(SerializationDebugger.scala:240)
        at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:150)
        at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:99)
        at org.apache.spark.serializer.SerializationDebugger$.find(SerializationDebugger.scala:58)
        at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:39)
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
        at org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:149)
        at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:464)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:232)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
        at org.apache.spark.scheduler.TaskSchedulerImpl.org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:227)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$6.apply(TaskSchedulerImpl.scala:296)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$6.apply(TaskSchedulerImpl.scala:294)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:294)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:294)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:294)
        at org.apache.spark.scheduler.local.LocalActor.reviveOffers(LocalBackend.scala:81)
        at org.apache.spark.scheduler.local.LocalActor$$anonfun$receiveWithLogging$1.applyOrElse(LocalBackend.scala:63)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
        at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53)
        at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
        at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at org.apache.spark.scheduler.local.LocalActor.aroundReceive(LocalBackend.scala:45)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
        at akka.dispatch.Mailbox.run(Mailbox.scala:220)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
        at java.io.ObjectStreamClass$FieldReflector.getObjFieldValues(ObjectStreamClass.java:2050)
        at java.io.ObjectStreamClass.getObjFieldValues(ObjectStreamClass.java:1252)
        ... 45 more
15/08/31 09:04:45 ERROR scheduler.TaskSchedulerImpl: Resource offer failed, task set TaskSet_0 was not serializable

ワーカーログとは異なるもの:

org.apache.spark.SparkContext.runJob(SparkContext.scala:1505)
com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:38)
$line15.$read$$iwC$$iwC.<init>(<console>:30)
$line15.$read$$iwC.<init>(<console>:51)
$line15.$read.<init>(<console>:53)
$line15.$read$.<init>(<console>:57)
$line15.$read$.<clinit>(<console>)
$line15.$eval$.<init>(<console>:7)
$line15.$eval$.<clinit>(<console>)
$line15.$eval.$print(<console>)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:606)
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$pasteCommand(SparkILoop.scala:824)
4

2 に答える 2

5

エラーの理由は、Scala REPL がコンパイルおよび評価フェーズの前にすべての式をオブジェクトにラップするためだと確信しています (比較パフォーマンス ベンチマークに Scala の REPL を使用するのは合理的ですか? を参照してください)。式をラップしている間、環境からすべてのオブジェクトを取得します。多くのオブジェクトは、ドライバー (リモート プロセス) に安全に送信できないシリアル化できない val である可能性があります。

解決策は、Spark シェルの外部でケース クラスを定義し、--jars両方を使用して CLASSPATH に含めることです。

于 2015-08-31T13:54:23.377 に答える