1

これは非常に特殊なケースかもしれませんが、しばらく頭を悩ませた後、Stackoverflow コミュニティの助けを借りようと思いました。

大規模なデータ セット (大規模なシステムからの 1 日分のデータ) の逆インデックスを構築しています。転置インデックスの構築は、Hadoop 上の map reduce ジョブとして実行されます。逆索引は、scala を使用して作成されます。転置インデックスの構造は次のとおりです。{key:"New", ProductID:[1,2,3,4,5,...]}これらは avro ファイルに書き込まれます。

このプロセス中に、Java ヒープ サイズの問題が発生します。その理由は、上で示した「New」のような用語には、多数の productId(s) が含まれているためだと思います。私のScalaコードで問題が発生する可能性がある大まかな考えがあります:

  def toIndexedRecord(ids: List[Long], token: String): IndexRecord = {
    val javaList = ids.map(l => l: java.lang.Long).asJava //need to convert from scala long to java long
    new IndexRecord(token, javaList)
  }

そして、これが私がこの方法を使用する方法です(多くの場所で使用されますが、同じコード構造とログインが使用されます)

  val titles = textPipeDump.map(vf => (vf.itemId, normalizer.customNormalizer(vf.title + " " + vf.subTitle).trim))
    .flatMap {
    case (id, title) =>
      val ss = title.split("\\s+")
      ss.map(word => (word, List(id)))
  }
    .filter(f => f._2.nonEmpty)
    .group
    .sum
    .map {
    case (token, ids) =>
      toIndexedRecord(ids, token)
  } 

textPipeDumpMultipleTextLineフィールドオブジェクトをやけどしている

case class MultipleTextLineFiles(p : String*) extends FixedPathSource(p:_*) with TextLineScheme

そのテキスト行から必要なフィールドを分割して取得するケースクラスがあり、それがオブジェクトですss

ここに私のスタックトレースがあります:

Exception in thread "IPC Client (47) connection to /127.0.0.1:55977 from job_201306241658_232590" java.lang.OutOfMemoryError: Java heap space
    at org.apache.hadoop.io.IOUtils.closeStream(IOUtils.java:226)
    at org.apache.hadoop.ipc.Client$Connection.close(Client.java:903)
    at org.apache.hadoop.ipc.Client$Connection.run(Client.java:800)
28079664 [main] ERROR cascading.flow.stream.TrapHandler - caught Throwable, no trap available, rethrowing
cascading.pipe.OperatorException: [WritableSequenceFile(h...][com.twitter.scalding.GroupBuilder$$anonfun$1.apply(GroupBuilder.scala:189)] operator Every failed executing operation: MRMAggregator[decl:'value']
    at cascading.flow.stream.AggregatorEveryStage.receive(AggregatorEveryStage.java:136)
    at cascading.flow.stream.AggregatorEveryStage.receive(AggregatorEveryStage.java:39)
    at cascading.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:49)
    at cascading.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:28)
    at cascading.flow.hadoop.stream.HadoopGroupGate.run(HadoopGroupGate.java:90)
    at cascading.flow.hadoop.FlowReducer.reduce(FlowReducer.java:133)
    at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:520)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:421)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1178)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)
Caused by: java.lang.OutOfMemoryError: Java heap space
    at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168)
    at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45)
    at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
    at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:176)
    at scala.collection.immutable.List.$colon$colon$colon(List.scala:127)
    at scala.collection.immutable.List.$plus$plus(List.scala:193)
    at com.twitter.algebird.ListMonoid.plus(Monoid.scala:86)
    at com.twitter.algebird.ListMonoid.plus(Monoid.scala:84)
    at com.twitter.scalding.KeyedList$$anonfun$sum$1.apply(TypedPipe.scala:264)
    at com.twitter.scalding.MRMAggregator.aggregate(Operations.scala:279)
    at cascading.flow.stream.AggregatorEveryStage.receive(AggregatorEveryStage.java:128)
    ... 12 more

小さなデータ セットに対してマップ削減ジョブを実行すると、エラーが発生しません。つまり、データが増加すると、New や old などの単語のインデックスを作成する items/product_id の数が大きくなり、ヒープ サイズがオーバーフローすることになります。

したがって、問題は、Java ヒープ サイズのオーバーフローを回避し、このタスクを達成する方法です。

4

0 に答える 0