作業するバッチを指定すると、基になる関数が大幅に効率的に動作する状況があります。次のような既存のコードがあります。
// subjects: RDD[Subject]
val subjects = Subject.load(job, sparkContext, config)
val classifications = subjects.flatMap(subject => classify(subject)).reduceByKey(_ + _)
classifications.saveAsTextFile(config.output)
このclassify
方法は単一の要素に対して機能しますが、要素のグループに対してより効率的に操作できます。coalesce
RDD をチャンクに分割し、各チャンクをグループとして処理することを検討しましたが、これには 2 つの問題があります。
- マップされた RDD を返す方法がわかりません。
classify
グループの大きさは事前にわからず、入力の内容によって異なります。
理想的な状況で呼び出す方法のサンプル コードclassify
(非常に大きな入力に対してスピルできないため、出力はぎこちなくなります):
def classifyRdd (subjects: RDD[Subject]): RDD[(String, Long)] = {
val classifier = new Classifier
subjects.foreach(subject => classifier.classifyInBatches(subject))
classifier.classifyRemaining
classifier.results
}
この方法でclassifyInBatches
は、内部的に次のようなコードを使用できます。
def classifyInBatches(subject: Subject) {
if (!internals.canAdd(subject)) {
partialResults.add(internals.processExisting)
}
internals.add(subject) // Assumption: at least one will fit.
}
このような動作を許可する Apache Spark で何ができますか?