Spark のワーカー ノードごとにいくつのエグゼキュータが起動されますか? その背後にある数学を知ることができますか?
たとえば、6 つのワーカー ノードと 1 つのマスターがあり、spark-submit を介してジョブを送信すると、ジョブに対して起動されるエグゼキューターの最大数はいくつになりますか?
Spark のワーカー ノードごとにいくつのエグゼキュータが起動されますか? その背後にある数学を知ることができますか?
たとえば、6 つのワーカー ノードと 1 つのマスターがあり、spark-submit を介してジョブを送信すると、ジョブに対して起動されるエグゼキューターの最大数はいくつになりますか?
@LiMuBeiの答えに便乗する...
--num-executors 4
このドキュメント ( http://jerryshao.me/architecture/2015/08/22/spark-dynamic-allocation-investigation/ ) によると、
Spark が保留中および実行中のタスクを通じて必要なエグゼキューターの最大数を計算する方法:
private def maxNumExecutorsNeeded(): Int = {
val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks
(numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor
}
現在のエグゼキュータ数が予想数よりも多い場合:
// The target number exceeds the number we actually need, so stop adding new
// executors and inform the cluster manager to cancel the extra pending requests
val oldNumExecutorsTarget = numExecutorsTarget
numExecutorsTarget = math.max(maxNeeded, minNumExecutors)
numExecutorsToAdd = 1
// If the new target has not changed, avoid sending a message to the cluster manager
if (numExecutorsTarget < oldNumExecutorsTarget) {
client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
logDebug(s"Lowering target number of executors to $numExecutorsTarget (previously " +
s"$oldNumExecutorsTarget) because not all requested executors are actually needed")
}
numExecutorsTarget - oldNumExecutorsTarget
現在のエグゼキューターの数が必要な数よりも多い場合、Spark はクラスター マネージャーに保留中の要求をキャンセルするように通知します。すでに割り当てられているエグゼキュータについては、後でタイムアウト メカニズムによって妥当な数にまで減らされます。
現在のエグゼキュータ数が目的の数を満たさない場合:
val oldNumExecutorsTarget = numExecutorsTarget
// There's no point in wasting time ramping up to the number of executors we already have, so
// make sure our target is at least as much as our current allocation:
numExecutorsTarget = math.max(numExecutorsTarget, executorIds.size)
// Boost our target with the number to add for this round:
numExecutorsTarget += numExecutorsToAdd
// Ensure that our target doesn't exceed what we need at the present moment:
numExecutorsTarget = math.min(numExecutorsTarget, maxNumExecutorsNeeded)
// Ensure that our target fits within configured bounds:
numExecutorsTarget = math.max(math.min(numExecutorsTarget, maxNumExecutors), minNumExecutors)
val delta = numExecutorsTarget - oldNumExecutorsTarget
// If our target has not changed, do not send a message
// to the cluster manager and reset our exponential growth
if (delta == 0) {
numExecutorsToAdd = 1
return 0
}
val addRequestAcknowledged = testing ||
client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
if (addRequestAcknowledged) {
val executorsString = "executor" + { if (delta > 1) "s" else "" }
logInfo(s"Requesting $delta new $executorsString because tasks are backlogged" +
s" (new desired total will be $numExecutorsTarget)")
numExecutorsToAdd = if (delta == numExecutorsToAdd) {
numExecutorsToAdd * 2
} else {
1
}
delta
} else {
logWarning(
s"Unable to reach the cluster manager to request $numExecutorsTarget total executors!")
0
}
考えられる答えは 2 つあります。
spark-submit
要求した量を取得する必要があります--num-executors X
ワーカー ノードあたりのエグゼキューターの数は、その時点で利用可能なリソースによって異なります。