4

Spark のワーカー ノードごとにいくつのエグゼキュータが起動されますか? その背後にある数学を知ることができますか?

たとえば、6 つのワーカー ノードと 1 つのマスターがあり、spark-submit を介してジョブを送信すると、ジョブに対して起動されるエグゼキューターの最大数はいくつになりますか?

4

2 に答える 2

4

@LiMuBeiの答えに便乗する...

まず、それはあなたが言うことです

--num-executors 4

ダイナミック アロケーションを使用している場合は、このように決定されます

このドキュメント ( http://jerryshao.me/architecture/2015/08/22/spark-dynamic-allocation-investigation/ ) によると、

Spark が保留中および実行中のタスクを通じて必要なエグゼキューターの最大数を計算する方法:

 private def maxNumExecutorsNeeded(): Int = {
    val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks
    (numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor
 }

現在のエグゼキュータ数が予想数よりも多い場合:

 // The target number exceeds the number we actually need, so stop adding new
 // executors and inform the cluster manager to cancel the extra pending requests
 val oldNumExecutorsTarget = numExecutorsTarget
 numExecutorsTarget = math.max(maxNeeded, minNumExecutors)
 numExecutorsToAdd = 1

 // If the new target has not changed, avoid sending a message to the cluster manager
 if (numExecutorsTarget < oldNumExecutorsTarget) {
   client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
   logDebug(s"Lowering target number of executors to $numExecutorsTarget (previously " +
     s"$oldNumExecutorsTarget) because not all requested executors are actually needed")
 }
 numExecutorsTarget - oldNumExecutorsTarget

現在のエグゼキューターの数が必要な数よりも多い場合、Spark はクラスター マネージャーに保留中の要求をキャンセルするように通知します。すでに割り当てられているエグゼキュータについては、後でタイムアウト メカニズムによって妥当な数にまで減らされます。

現在のエグゼキュータ数が目的の数を満たさない場合:

 val oldNumExecutorsTarget = numExecutorsTarget

 // There's no point in wasting time ramping up to the number of executors we already have, so
 // make sure our target is at least as much as our current allocation:
 numExecutorsTarget = math.max(numExecutorsTarget, executorIds.size)

 // Boost our target with the number to add for this round:
 numExecutorsTarget += numExecutorsToAdd

 // Ensure that our target doesn't exceed what we need at the present moment:
 numExecutorsTarget = math.min(numExecutorsTarget, maxNumExecutorsNeeded)

 // Ensure that our target fits within configured bounds:
 numExecutorsTarget = math.max(math.min(numExecutorsTarget, maxNumExecutors), minNumExecutors)
 val delta = numExecutorsTarget - oldNumExecutorsTarget

 // If our target has not changed, do not send a message
 // to the cluster manager and reset our exponential growth
 if (delta == 0) {
   numExecutorsToAdd = 1
   return 0
 }
 val addRequestAcknowledged = testing ||
   client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
 if (addRequestAcknowledged) {
   val executorsString = "executor" + { if (delta > 1) "s" else "" }
   logInfo(s"Requesting $delta new $executorsString because tasks are backlogged" +
     s" (new desired total will be $numExecutorsTarget)")
   numExecutorsToAdd = if (delta == numExecutorsToAdd) {
     numExecutorsToAdd * 2
   } else {
     1
   }
   delta
 } else {
   logWarning(
     s"Unable to reach the cluster manager to request $numExecutorsTarget total executors!")
   0
 }
于 2016-10-11T01:46:23.690 に答える
2

考えられる答えは 2 つあります。

  • 呼び出し時にエグゼキューターの量を指定すると、spark-submit要求した量を取得する必要があります--num-executors X
  • 指定しない場合、デフォルトで、Spark は必要に応じてより多くのエグゼキューターを開始する動的割り当てを使用する必要があります。この場合、実行者の最大数などの動作を構成できます。http://spark.apache.org/docs/latest/configuration.html#dynamic-allocation を参照してください。

ワーカー ノードあたりのエグゼキューターの数は、その時点で利用可能なリソースによって異なります。

于 2016-10-10T22:07:49.547 に答える