0

作業するバッチを指定すると、基になる関数が大幅に効率的に動作する状況があります。次のような既存のコードがあります。

// subjects: RDD[Subject]
val subjects = Subject.load(job, sparkContext, config)
val classifications = subjects.flatMap(subject => classify(subject)).reduceByKey(_ + _)
classifications.saveAsTextFile(config.output)

このclassify方法は単一の要素に対して機能しますが、要素のグループに対してより効率的に操作できます。coalesceRDD をチャンクに分割し、各チャンクをグループとして処理することを検討しましたが、これには 2 つの問題があります。

  1. マップされた RDD を返す方法がわかりません。
  2. classifyグループの大きさは事前にわからず、入力の内容によって異なります。

理想的な状況で呼び出す方法のサンプル コードclassify(非常に大きな入力に対してスピルできないため、出力はぎこちなくなります):

def classifyRdd (subjects: RDD[Subject]): RDD[(String, Long)] = {
  val classifier = new Classifier
  subjects.foreach(subject => classifier.classifyInBatches(subject))
  classifier.classifyRemaining
  classifier.results
}

この方法でclassifyInBatchesは、内部的に次のようなコードを使用できます。

def classifyInBatches(subject: Subject) {
  if (!internals.canAdd(subject)) {
    partialResults.add(internals.processExisting)
  }
  internals.add(subject) // Assumption: at least one will fit.
}

このような動作を許可する Apache Spark で何ができますか?

4

1 に答える 1