2

私は (Scala API を介して) Spark で LDA モデルを実装し、さまざまな数のトピックでモデルをテストしています。一般的には問題なく動作しているように見えますが、メモリの問題に関連していると確信している断続的なタスク エラーが発生します。私の現在のコードの関連部分は以下の通りです。

各ドキュメントがスパース mllib ベクトルである RDD のテキスト ダンプからデータをロードしていることに注意してください。したがって、私のファイルの例の行はLDA_vectors次のようになります。

(7066346,(112312,[1,3,5,7,...],[2.0,57.0,10.0,2.0,...]))

これは標準の mllib スパース形式であり、次のように読むことができます。

(document_id,(vocabulary_size,[term_id_1, term_id_2...], [term_1_freq,term_2, freq,...]))

したがって、parse関数はそれを RDD に読み込む処理を行います。

import org.apache.spark.mllib.clustering.{LDA, DistributedLDAModel, LocalLDAModel}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.rdd.RDD
import java.io._

def parse(rdd: RDD[String]): RDD[(Long, Vector)] = {
  val pattern: scala.util.matching.Regex = "\\(([0-9]+),(.*)\\)".r
  rdd .map{
  case pattern(k, v) => (k.toLong, Vectors.parse(v))
  }
 }

val text = sc.textFile("/path/to/LDA_vectors")
val docsWithFeatures = parse(text).repartition(192).cache()

そして、さまざまな数のトピックでループを実行します。Word ドキュメント マトリックスをファイルに保存するためのコード ブロックは、ここで説明する方法に従っていることに注意してください。

for (n_topics <- Range(10,301,5) ){

  val s = n_topics.toString
  val lda = new LDA().setK(n_topics).setMaxIterations(20) 
  val ldaModel = lda.run(docsWithFeatures)
  val distLDAModel = ldaModel.asInstanceOf[DistributedLDAModel]

  // write some model info to file
  val outfile = new File(s"model_summary_$s")
  @transient val bw = new BufferedWriter(new FileWriter(outfile))
  bw.write("topicConcentration:"+"\t"+distLDAModel.topicConcentration+"\n")
  bw.write("docConcentration:"+"\t"+distLDAModel.docConcentration(0)+"\n")
  bw.write("LL:"+"\t"+distLDAModel.logLikelihood+"\n")
  bw.close()

  // convert to Distributed model so we can get our topic data out 
  val distLDAModel = ldaModel.asInstanceOf[DistributedLDAModel]

  // Save the document-topic matrix to file
  distLDAModel.topicDistributions.saveAsTextFile(s"doc_topic_$s")

  // this saves the word-topic matrix to file
  val topic_mat = distLDAModel.topicsMatrix
  val localMatrix: List[Array[Double]] = topic_mat.transpose.toArray.grouped(topic_mat.numCols).toList
  val lines: List[String] = localMatrix.map(line => line.mkString(" "))
  val outfile2 = new File(s"artist_topic_$s")
  @transient val bw2 = new BufferedWriter(new FileWriter(outfile2))
  for (line <- lines) bw2.write(line+"\n")
  bw2.close()
}

Ok。したがって、これはすべて正常に機能しますが、前述したように、タスクの失敗に遭遇し始め、トピックの数を増やす可能性が徐々に高くなりました. そして、これらはメモリの問題が原因であると思われるため、spark で LDA のパフォーマンスを調整するにはどうすればよいか疑問に思います。

私は Google Cloud Dataproc で実行しているため、リソースは柔軟ですが、ここでパフォーマンスを最適化する最善の方法を知るには、Spark の LDA の内部を十分に理解していないことに気付きました。

これまでの私の試みは、次の行で行うことです。

val docsWithFeatures = parse(text).repartition(192).cache()

ここでは、ドキュメントの RDD を 192 個のパーティションに再分割し (この例では、48 個のコアで spark を実行していたので、4*n_cores の経験則を使用しました)、それをキャッシュします。これは、たとえば、RDD でマップを繰り返し実行していた場合には妥当ですが、ここでのパフォーマンスに役立つかどうか、またはどのように役立つかはわかりません。ここで他に何ができますか?

回答を容易にするために、ここに私のコーパスの要約統計を示します。

  • ドキュメント: 166,784
  • 語彙数(固有用語数):112,312
  • 総トークン数: 4,430,237,213

おそらく、トークンの数が多いことがここでの主な問題であり、タスクごとのメモリを増やす必要があるだけです。つまり、使用可能なメモリを増やしてエグゼキュータの数を減らすことです。しかしもちろん、これは、Spark LDA が内部でどの程度正確に機能しているかによって異なります。たとえば、私の以前の質問hereを参照してください。

4

0 に答える 0