1

DataFrame とカウントに対してさまざまなフィルター操作を実行してから、個々のカウントの合計を実行する必要があります。同時実行には Scala Future を使用します。コードは次のとおりです。

import scala.concurrent.{Await, Future, blocking}
import scala.concurrent.ExecutionContext.Implicits.global

val f1  = Future{myDF.filter("pmod(idx, 8) = 1").count}
val f2  = Future{myDF.filter("pmod(idx, 8) = 2").count}
val f3  = Future{myDF.filter("pmod(idx, 8) = 3").count}

val future = for {c1 <- f1; c2 <- f2; c3 <- f3} yield {
   c1 + c2 + c3 
}

val summ = Await.result(future, 180 second)

各フィルター/カウント操作の実行時間は約 7 秒です。ただし、何度も実行した後、同時実行の合計時間は、予想した 7 秒ではなく、常に約 35 秒かかります。私はかなり長い間この動作に戸惑いましたが、理解できません。

3 台のマシン、1 つのマスター ノード、2 つのワーカー ノード、および各ノードに 128G メモリと 32 コアのクラスターがあります。データのサイズは約3Gです。同時実行中、1 つのワーカー ノードに 20 秒の GC 時間がかかることに気付きました。個々のフィルター/カウント操作に GC 時間がほとんどないように GC を調整しました。3 つの Future の同時実行を実行するたびに GC が起動する理由と、それが同時実行時間を長くする理由かどうかはわかりません。

誰でもこの問題について経験がありますか?

4

1 に答える 1

1

スクリプト内の各ジョブは、ジョブが操作するデータ間の優先関係を定義するジョブの DAG 内のノードであるため、ジョブはクラスタ全体で順次スケジュールされます。そして、スクリプト全体を正常に実行するには、その優先順位を尊重する必要があります。

このルールは、ジョブ間に先行関係がない場合でも適用されます (ただし、それらはすべて同じデータ、myDF に依存します)。また、Futures の使用は、ジョブがほぼ同時にスケジューラに送信されることを意味するだけであり、そのような方法でスケジュールされることにはなりません。

並列処理が必要な場合は、ジョブ内に次のように記述する必要があります。

myDF.filter("pmod(idx,8) < 4 && pmod(idx,8) > 0").groupBy("pmod(idx,8)").count()

はい、キャッシュする必要がありますmyDf

于 2015-11-09T08:17:56.157 に答える