0

I am submitting my spark jobs from a local laptop to a remote standalone Spark cluster (spark://IP:7077). It is submitted successfully. However, I do not get any output and it fails after some time. When i check the workers on my cluster, I find the following exception:

Exception in thread "main" akka.actor.ActorNotFound: Actor not found for: ActorSelection[Actor[akka.tcp://sparkDriver@localhost:54561/]/user/CoarseGrainedScheduler]

When I run the same code on my local system (local[*]), it runs successfully and gives the output.

Note that I run it in spark notebook. The same application runs successfully on the remote standalone cluster when i submit it via terminal using spark-submit

Am I missing something in the configuration of notebook? Any other possible causes?

The code is very simple.

Detailed exception:

Exception in thread "main" akka.actor.ActorNotFound: Actor not found for: ActorSelection[Actor[akka.tcp://sparkDriver@localhost:54561/]/user/CoarseGrainedScheduler]
    at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:66)
    at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:64)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
    at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
    at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
    at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
    at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
    at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:269)
    at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:512)
    at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:545)
    at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:535)
    at akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:91)
    at akka.actor.ActorRef.tell(ActorRef.scala:125)
    at akka.dispatch.Mailboxes$$anon$1$$anon$2.enqueue(Mailboxes.scala:44)
    at akka.dispatch.QueueBasedMessageQueue$class.cleanUp(Mailbox.scala:438)
    at akka.dispatch.UnboundedDequeBasedMailbox$MessageQueue.cleanUp(Mailbox.scala:650)
    at akka.dispatch.Mailbox.cleanUp(Mailbox.scala:309)
    at akka.dispatch.MessageDispatcher.unregister(AbstractDispatcher.scala:204)
    at akka.dispatch.MessageDispatcher.detach(AbstractDispatcher.scala:140)
    at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:203)
    at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163)
    at akka.actor.ActorCell.terminate(ActorCell.scala:338)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
    at akka.dispatch.Mailbox.run(Mailbox.scala:218)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Sample code

val logFile = "hdfs://hostname/path/to/file"
val conf = new SparkConf() 
.setMaster("spark://hostname:7077") // as appears on hostname:8080
.setAppName("myapp")
.set("spark.executor.memory", "20G")
.set("spark.cores.max", "40")
.set("spark.executor.cores","20")
.set("spark.driver.allowMultipleContexts","true")

val sc2 = new SparkContext(conf)
val logData = sc2.textFile(logFile)
val numAs = logData.filter(line => line.contains("hello")).count()
val numBs = logData.filter(line => line.contains("hi")).count()
println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
4

2 に答える 2

1

アップデート:

上記の問題は、アプリケーション コード内にドライバーの IP アドレス (つまり、ローカル ラップトップのパブリック IP) を含めることで回避できます。これは、spark コンテキストに次の行を追加することで実行できます。

.set("spark.driver.host",YourSystemIPAddress)

ただし、ドライバーの IP アドレスが NAT の背後にある場合、問題が発生する可能性があります。この場合、ワーカーは IP を見つけることができません。

于 2016-01-21T19:09:38.347 に答える