2

これは、Spark に付属のサンプル コードです。ここにコードをコピーしました。これがリンクです: https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala . ただし、コマンド「bin/run-example org.apache.spark.examples.streaming.StatefulNetworkWordCount localhost 9999」を使用してプログラムを実行しようとすると、次のエラーが発生しました。

14/07/20 11:52:57 ERROR ActorSystemImpl: Uncaught fatal error from thread [spark-akka.actor.default-dispatcher-4] shutting down ActorSystem [spark]
java.lang.NoSuchMethodError: java.util.concurrent.ConcurrentHashMap.keySet()Ljava/util/concurrent/ConcurrentHashMap$KeySetView;
    at org.apache.spark.streaming.scheduler.JobScheduler.getPendingTimes(JobScheduler.scala:114)
    at org.apache.spark.streaming.Checkpoint.<init>(Checkpoint.scala:43)
    at org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:259)
    at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:167)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Exception in thread "Thread-37" org.apache.spark.SparkException: Job cancelled because SparkContext was shut down
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:639)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:638)
    at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
    at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:638)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1215)
    at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:201)
    at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163)
    at akka.actor.ActorCell.terminate(ActorCell.scala:338)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:240)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
14/07/20 11:53:00 ERROR Executor: Exception in task ID 0
java.lang.IllegalStateException: cannot create children while terminating or terminated
    at akka.actor.dungeon.Children$class.makeChild(Children.scala:184)
    at akka.actor.dungeon.Children$class.attachChild(Children.scala:42)
    at akka.actor.ActorCell.attachChild(ActorCell.scala:338)
    at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:518)
    at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.<init>(ReceiverSupervisorImpl.scala:67)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:263)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080)
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
    at org.apache.spark.scheduler.Task.run(Task.scala:51)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
14/07/20 11:53:06 ERROR ExecutorUncaughtExceptionHandler: Uncaught exception in thread Thread[spark-akka.actor.default-dispatcher-13,5,main]
org.apache.spark.SparkException: Error sending message to BlockManagerMaster [message = HeartBeat(BlockManagerId(<driver>, x-131-212-225-148.uofm-secure.wireless.umn.edu, 47668, 0))]
    at org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:251)
    at org.apache.spark.storage.BlockManagerMaster.sendHeartBeat(BlockManagerMaster.scala:51)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$heartBeat(BlockManager.scala:113)
    at org.apache.spark.storage.BlockManager$$anonfun$initialize$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(BlockManager.scala:158)
    at org.apache.spark.util.Utils$.tryOrExit(Utils.scala:790)
    at org.apache.spark.storage.BlockManager$$anonfun$initialize$1.apply$mcV$sp(BlockManager.scala:158)
    at akka.actor.Scheduler$$anon$9.run(Scheduler.scala:80)
    at akka.actor.LightArrayRevolverScheduler$$anon$3$$anon$2.run(Scheduler.scala:241)
    at akka.actor.LightArrayRevolverScheduler$TaskHolder.run(Scheduler.scala:464)
    at akka.actor.LightArrayRevolverScheduler$$anonfun$close$1.apply(Scheduler.scala:281)
    at akka.actor.LightArrayRevolverScheduler$$anonfun$close$1.apply(Scheduler.scala:280)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at akka.actor.LightArrayRevolverScheduler.close(Scheduler.scala:279)
    at akka.actor.ActorSystemImpl.stopScheduler(ActorSystem.scala:630)
    at akka.actor.ActorSystemImpl$$anonfun$_start$1.apply$mcV$sp(ActorSystem.scala:582)
    at akka.actor.ActorSystemImpl$$anonfun$_start$1.apply(ActorSystem.scala:582)
    at akka.actor.ActorSystemImpl$$anonfun$_start$1.apply(ActorSystem.scala:582)
    at akka.actor.ActorSystemImpl$$anon$3.run(ActorSystem.scala:596)
    at akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$run$1.runNext$1(ActorSystem.scala:750)
    at akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$run$1.apply$mcV$sp(ActorSystem.scala:753)
    at akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$run$1.apply(ActorSystem.scala:746)
    at akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$run$1.apply(ActorSystem.scala:746)
    at akka.util.ReentrantGuard.withGuard(LockUtil.scala:15)
    at akka.actor.ActorSystemImpl$TerminationCallbacks.run(ActorSystem.scala:746)
    at akka.actor.ActorSystemImpl$$anonfun$terminationCallbacks$1.apply(ActorSystem.scala:593)
    at akka.actor.ActorSystemImpl$$anonfun$terminationCallbacks$1.apply(ActorSystem.scala:593)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
    at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:42)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.pattern.AskTimeoutException: Recipient[Actor[akka://spark/user/BlockManagerMaster#1887396223]] had already been terminated.
    at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
    at org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:236)

****************コード********************

object StatefulNetworkWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: StatefulNetworkWordCount <hostname> <port>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.foldLeft(0)(_ + _)

      val previousCount = state.getOrElse(0)

      Some(currentCount + previousCount)
    }

    val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount")
    // Create the context with a 1 second batch size
    val ssc = new StreamingContext(sparkConf, Seconds(1))
    ssc.checkpoint(".")

    // Create a NetworkInputDStream on target ip:port and count the
    // words in input stream of \n delimited test (eg. generated by 'nc')
    val lines = ssc.socketTextStream(args(0), args(1).toInt)
    val words = lines.flatMap(_.split(" "))
    val wordDstream = words.map(x => (x, 1))

    // Update the cumulative count using updateStateByKey
    // This will give a Dstream made of state (which is the cumulative count of the words)
    val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
    stateDstream.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

ファイルが Hadoop と互換性のないファイルであるのに、コマンド「ssc.checkpoint(".")」を実行してローカル ファイル システムにチェックポイントを設定しようとしているからでしょうか? (チェックポイントを設定するには、ファイルが Hadoop と互換性がある必要があります) 互換性がある場合、どうすれば修正できますか? ありがとう!

4

1 に答える 1