0

サイズのRDD[(Long, String)]S3 パス (バケット + キー) があります。各パーティションのサイズの合計がほぼ同じ値になるパスを取得するように、パーティションを分割したいと考えています。そうすれば、これらのパスのコンテンツを読み取るときに、各パーティションで処理するデータ量がほぼ同じになるはずです。そのために、このカスタム パーティショナーを作成しました。

import org.apache.spark.Partitioner
import scala.collection.mutable.PriorityQueue

class S3Partitioner(partitions: Int, val totalSize: Long) extends Partitioner {
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
  require(totalSize >= 0, s"Number of totalSize ($totalSize) cannot be negative.")

  val pq = PriorityQueue[(Int, Long)]()
  (0 until partitions).foreach { partition =>
    pq.enqueue((partition, totalSize / partitions))
  }

  def getPartition(key: Any): Int = key match {
    case k: Long =>
      val (partition, capacityLeft) = pq.dequeue
      pq.enqueue((partition, capacityLeft - k))
      partition
    case _ => 0
  }

  def numPartitions: Int = partitions

  override def equals(other: Any): Boolean = other match {
    case p: S3Partitioner =>
      p.totalSize == totalSize && p.numPartitions == numPartitions
    case _ => false
  }

  override def hashCode: Int = {
    (972 * numPartitions.hashCode) ^ (792 * totalSize.hashCode)
  }
}

パーティショナーは、キー (サイズ) が降順で並べ替えられた RDD が供給された場合に最適なパフォーマンスを発揮するはずです。使用しようとすると、以前は機能していたコードで次のエラーが発生し始めました。

Cause: java.io.NotSerializableException: scala.collection.mutable.PriorityQueue$ResizableArrayAccess

これは私がそれを使用している方法です:

val pathsWithSize: RDD[(Long, String)] = ...
val totalSize = pathsWithSize.map(_._1).reduce(_ + _)

new PairRDDFunctions(pathsWithSize)
  .partitionBy(new S3Partitioner(5 * sc.defaultParallelism, totalSize))
  .mapPartitions { iter =>
    iter.map { case (_, path) => readS3(path) }
  }

そして、これを修正する方法がわかりません。助けていただければ幸いです。

4

0 に答える 0