4

私は自分のコンピューターでローカルに Apache Spark の Web クローリング/スクラップ プログラムをテストしていました。

プログラムは、散発的に失敗する揮発性関数を使用するいくつかの RDD 変換を使用します。(この関数の目的は、URL リンクを Web ページに変換することです。ヘッドレス ブラウザが単にブラックアウトしたり、過負荷になったりすることがあります。これを避けることはできません)。

Apache Spark には強力なフェイルオーバーと再試行機能があり、失敗した変換や失われたデータは、見つけたリソースからゼロから再計算できると聞きました (魔法のように聞こえますよね?)。コード。

これは私のスパーク構成です:

val conf = new SparkConf().setAppName("MoreLinkedIn")
conf.setMaster("local[*]")
conf.setSparkHome(System.getenv("SPARK_HOME"))
conf.setJars(SparkContext.jarOfClass(this.getClass).toList)
conf.set("spark.task.maxFailures","40") //definitely enough

残念ながら、大部分のステージと個々のタスクが成功した後、ジョブは失敗しました。コンソールの最新のログには次のように表示されます。

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1.0:7 failed 1 times, most recent failure: Exception failure in TID 23 on host localhost: org.openqa.selenium.TimeoutException: Timed out after 50 seconds waiting for...

一度失敗すると、Spark は臆病にあきらめているようです。より粘り強くするために適切に構成するにはどうすればよいですか?

(私のプログラムはhttps://github.com/tribbloid/spookystuffからダウンロードできます。コード/ドキュメントが少なくまとまりがなく申し訳ありません。数日間だけ開始します)

追加: 自分で試してみたい場合は、次のコードでこの問題を示すことができます。

def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Spark Pi")
conf.setMaster("local[*]")
conf.setSparkHome(System.getenv("SPARK_HOME"))
conf.setJars(SparkContext.jarOfClass(this.getClass).toList)
conf.set("spark.task.maxFailures","400000")
val sc = new SparkContext(conf)
val slices = if (args.length > 0) args(0).toInt else 8
val n = 100000 * slices
val count = sc.parallelize(1 to n, slices).map { i =>
  val x = java.lang.Math.random()
  if (x > 0.9) throw new IllegalStateException("the map has a chance of 10% to fail")
  x
}.reduce(_ + _)
sc.stop()
println("finished")
}

この投稿では、同じ IllegalStateException が 32 回再試行されていることに注意してください: Apache Spark Throws java.lang.IllegalStateException: unread block data

4

3 に答える 3

9

非常に古い質問であることは知っていますが、まったく同じ問題があり、解決策を探しているときにこの質問に出くわしました。

ローカル モードで Spark アプリケーションを送信するための3 つのマスター URL 形式があります。

  • local- 1 つのスレッド (並列処理なし)、再試行なし
  • local[K](またはlocal[*]) - K(またはコア数) ワーカー スレッドを使用し、 (こちらを参照)に設定task.maxFailuresします。1

  • local[K, F](またはlocal[*, F])- を設定しtask.maxFailures=Fます。これが私たちが求めていたものです。

詳細については、 Spark のドキュメントを参照してください。

于 2017-12-08T16:21:42.027 に答える