次のドライバー プログラムを使用してスタンドアロン モードで Spark 0.7.2 を使用し、7 つのワーカーと 1 つの個別のマスターを使用して最大 90 GB (圧縮: 19 GB) のログデータを処理しています。
System.setProperty("spark.default.parallelism", "32")
val sc = new SparkContext("spark://10.111.1.30:7077", "MRTest", System.getenv("SPARK_HOME"), Seq(System.getenv("NM_JAR_PATH")))
val logData = sc.textFile("hdfs://10.111.1.30:54310/logs/")
val dcxMap = logData.map(line => (line.split("\\|")(0),
line.split("\\|")(9)))
.reduceByKey(_ + " || " + _)
dcxMap.saveAsTextFile("hdfs://10.111.1.30:54310/out")
ShuffleMapTasks
ステージ 1 のすべてが完了したら:
Stage 1 (reduceByKey at DcxMap.scala:31) finished in 111.312 s
ステージ 0 を送信します。
Submitting Stage 0 (MappedRDD[6] at saveAsTextFile at DcxMap.scala:38), which is now runnable
いくつかのシリアル化の後、印刷されます
spark.MapOutputTrackerActor - Asked to send map output locations for shuffle 0 to host23
spark.MapOutputTracker - Size of output statuses for shuffle 0 is 2008 bytes
spark.MapOutputTrackerActor - Asked to send map output locations for shuffle 0 to host21
spark.MapOutputTrackerActor - Asked to send map output locations for shuffle 0 to host22
spark.MapOutputTrackerActor - Asked to send map output locations for shuffle 0 to host26
spark.MapOutputTrackerActor - Asked to send map output locations for shuffle 0 to host24
spark.MapOutputTrackerActor - Asked to send map output locations for shuffle 0 to host27
spark.MapOutputTrackerActor - Asked to send map output locations for shuffle 0 to host28
この後、何も起こらず、top
ワーカーがすべてアイドル状態になったことも示唆しています。ワーカー マシンのログを見ると、それぞれで同じことが起こります。
13/06/21 07:32:25 INFO network.SendingConnection: Initiating connection to [host27/127.0.1.1:34288]
13/06/21 07:32:25 INFO network.SendingConnection: Initiating connection to [host27/127.0.1.1:36040]
13/06/21 07:32:25 INFO network.SendingConnection: Initiating connection to [host27/127.0.1.1:50467]
13/06/21 07:32:25 INFO network.SendingConnection: Initiating connection to [host27/127.0.1.1:60833]
13/06/21 07:32:25 INFO network.SendingConnection: Initiating connection to [host27/127.0.1.1:49893]
13/06/21 07:32:25 INFO network.SendingConnection: Initiating connection to [host27/127.0.1.1:39907]
次に、これらの「接続の開始」試行ごとに、各ワーカーで同じエラーをスローします (例として host27 のログを示し、エラーの最初の発生のみを示します)。
13/06/21 07:32:25 WARN network.SendingConnection: Error finishing connection to host27/127.0.1.1:49893
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:701)
at spark.network.SendingConnection.finishConnect(Connection.scala:221)
at spark.network.ConnectionManager.spark$network$ConnectionManager$$run(ConnectionManager.scala:127)
at spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:70)
なぜこれが起こるのですか?ワーカー同士は問題なく通信できているようですが、唯一の問題は、自分自身にメッセージを送信したい場合に発生するようです。上記の例では、host27 は自分自身に 6 つのメッセージを送信しようとしますが、6 回失敗します。他のワーカーへのメッセージの送信は正常に機能します。誰かがアイデアを持っていますか?
編集:おそらく127.0を使用するsparkに関係しています。127.0 ではなく1 .1。0.1 ?
/etc/hosts
次のようになります。
127.0.0.1 localhost
127.0.1.1 host27.<ourdomain> host27