私は (Scala API を介して) Spark で LDA モデルを実装し、さまざまな数のトピックでモデルをテストしています。一般的には問題なく動作しているように見えますが、メモリの問題に関連していると確信している断続的なタスク エラーが発生します。私の現在のコードの関連部分は以下の通りです。
各ドキュメントがスパース mllib ベクトルである RDD のテキスト ダンプからデータをロードしていることに注意してください。したがって、私のファイルの例の行はLDA_vectors
次のようになります。
(7066346,(112312,[1,3,5,7,...],[2.0,57.0,10.0,2.0,...]))
これは標準の mllib スパース形式であり、次のように読むことができます。
(document_id,(vocabulary_size,[term_id_1, term_id_2...], [term_1_freq,term_2, freq,...]))
したがって、parse
関数はそれを RDD に読み込む処理を行います。
import org.apache.spark.mllib.clustering.{LDA, DistributedLDAModel, LocalLDAModel}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.rdd.RDD
import java.io._
def parse(rdd: RDD[String]): RDD[(Long, Vector)] = {
val pattern: scala.util.matching.Regex = "\\(([0-9]+),(.*)\\)".r
rdd .map{
case pattern(k, v) => (k.toLong, Vectors.parse(v))
}
}
val text = sc.textFile("/path/to/LDA_vectors")
val docsWithFeatures = parse(text).repartition(192).cache()
そして、さまざまな数のトピックでループを実行します。Word ドキュメント マトリックスをファイルに保存するためのコード ブロックは、ここで説明する方法に従っていることに注意してください。
for (n_topics <- Range(10,301,5) ){
val s = n_topics.toString
val lda = new LDA().setK(n_topics).setMaxIterations(20)
val ldaModel = lda.run(docsWithFeatures)
val distLDAModel = ldaModel.asInstanceOf[DistributedLDAModel]
// write some model info to file
val outfile = new File(s"model_summary_$s")
@transient val bw = new BufferedWriter(new FileWriter(outfile))
bw.write("topicConcentration:"+"\t"+distLDAModel.topicConcentration+"\n")
bw.write("docConcentration:"+"\t"+distLDAModel.docConcentration(0)+"\n")
bw.write("LL:"+"\t"+distLDAModel.logLikelihood+"\n")
bw.close()
// convert to Distributed model so we can get our topic data out
val distLDAModel = ldaModel.asInstanceOf[DistributedLDAModel]
// Save the document-topic matrix to file
distLDAModel.topicDistributions.saveAsTextFile(s"doc_topic_$s")
// this saves the word-topic matrix to file
val topic_mat = distLDAModel.topicsMatrix
val localMatrix: List[Array[Double]] = topic_mat.transpose.toArray.grouped(topic_mat.numCols).toList
val lines: List[String] = localMatrix.map(line => line.mkString(" "))
val outfile2 = new File(s"artist_topic_$s")
@transient val bw2 = new BufferedWriter(new FileWriter(outfile2))
for (line <- lines) bw2.write(line+"\n")
bw2.close()
}
Ok。したがって、これはすべて正常に機能しますが、前述したように、タスクの失敗に遭遇し始め、トピックの数を増やす可能性が徐々に高くなりました. そして、これらはメモリの問題が原因であると思われるため、spark で LDA のパフォーマンスを調整するにはどうすればよいか疑問に思います。
私は Google Cloud Dataproc で実行しているため、リソースは柔軟ですが、ここでパフォーマンスを最適化する最善の方法を知るには、Spark の LDA の内部を十分に理解していないことに気付きました。
これまでの私の試みは、次の行で行うことです。
val docsWithFeatures = parse(text).repartition(192).cache()
ここでは、ドキュメントの RDD を 192 個のパーティションに再分割し (この例では、48 個のコアで spark を実行していたので、4*n_cores の経験則を使用しました)、それをキャッシュします。これは、たとえば、RDD でマップを繰り返し実行していた場合には妥当ですが、ここでのパフォーマンスに役立つかどうか、またはどのように役立つかはわかりません。ここで他に何ができますか?
回答を容易にするために、ここに私のコーパスの要約統計を示します。
- ドキュメント: 166,784
- 語彙数(固有用語数):112,312
- 総トークン数: 4,430,237,213
おそらく、トークンの数が多いことがここでの主な問題であり、タスクごとのメモリを増やす必要があるだけです。つまり、使用可能なメモリを増やしてエグゼキュータの数を減らすことです。しかしもちろん、これは、Spark LDA が内部でどの程度正確に機能しているかによって異なります。たとえば、私の以前の質問hereを参照してください。