DataFrame とカウントに対してさまざまなフィルター操作を実行してから、個々のカウントの合計を実行する必要があります。同時実行には Scala Future を使用します。コードは次のとおりです。
import scala.concurrent.{Await, Future, blocking}
import scala.concurrent.ExecutionContext.Implicits.global
val f1 = Future{myDF.filter("pmod(idx, 8) = 1").count}
val f2 = Future{myDF.filter("pmod(idx, 8) = 2").count}
val f3 = Future{myDF.filter("pmod(idx, 8) = 3").count}
val future = for {c1 <- f1; c2 <- f2; c3 <- f3} yield {
c1 + c2 + c3
}
val summ = Await.result(future, 180 second)
各フィルター/カウント操作の実行時間は約 7 秒です。ただし、何度も実行した後、同時実行の合計時間は、予想した 7 秒ではなく、常に約 35 秒かかります。私はかなり長い間この動作に戸惑いましたが、理解できません。
3 台のマシン、1 つのマスター ノード、2 つのワーカー ノード、および各ノードに 128G メモリと 32 コアのクラスターがあります。データのサイズは約3Gです。同時実行中、1 つのワーカー ノードに 20 秒の GC 時間がかかることに気付きました。個々のフィルター/カウント操作に GC 時間がほとんどないように GC を調整しました。3 つの Future の同時実行を実行するたびに GC が起動する理由と、それが同時実行時間を長くする理由かどうかはわかりません。
誰でもこの問題について経験がありますか?