0

mapreduce のデータフローを理解するのに少し苦労しています。最近、reduce フェーズでディスクのメモリが不足したときに、非常に要求の厳しいジョブがクラッシュしました。ジョブに必要なディスクの量を見積もることは難しいと思います。データフローについて詳しく説明します。

誰かが mapreduce のデータ フローを修正したり、詳細を説明したり、システムの寸法についてアドバイスをくれたりすると助かります。

クラスタ構成:

30 個のスレーブを含むクラスターがあります。

  • 12GBのRAM
  • 100GBのハードディスク
  • 4コア

私のマップ タスクは wordcount に非常に似ているため、必要なメモリはほとんどありません。私の削減タスクは、単語の順列グループで機能します。同じ単語を結合する必要があるため、reduce 関数には常に <= 3GB の一時ハッシュ マップが必要です。

私は 12GB の RAM を持っており、私の Hadoop デーモンは 1GB のヒープ + オペレーティング システム用に 500MB を必要とするため、map/reduce スロットを次のように分割しました。

900MB ヒープの 4 つの map スロット3GB ヒープの 2 つの reduce スロット。マップ スロットは 300 MB を超えるメモリを必要としないためio.sort.mb、マップ フェーズでのメモリ内ソートを改善するために 500 MB に設定しました。

私のジョブには、それぞれ 8 GB のマップ出力を生成する 1800 のマップ タスクがあります。圧縮にBZIP2を使っているので、1GBまで圧縮できます。これは、3 TB のメモリがあるのに、合計マップ出力が 2 TB 未満になることを意味します。

それぞれ 5 GB の出力を生成する 100 個の削減タスクを選択しました。

一見すると、すべてがメモリに収まるはずです。しかし、明らかに、ソート段階では圧縮と解凍が必要であり、コピー段階ではデータが同時に2つの場所にある必要があります(私は推測します)。ここがややこしいところなので、データの流れを完全に理解したいと思います。これは私がそれが機能する方法だと思いますが、間違っている場合は修正してください:

データフロー

マップ タスクは多数(私の場合は 200)のスピルを生成し、メモリ内で並べ替えられ、ローカル ディスクに書き込まれる前に圧縮されます。マップ タスクが完了すると、10 個ごとにマージされる 200 個のスピル ファイルが得られます( io.sort.factor)。これは、10 個のファイルが解凍されることを意味します: 10 x (5MB -> 40MB)、したがって、0.4GB の圧縮/解凍オーバーヘッドが発生します。. 200回の流出が最初のマージラウンドを行った後に何が起こるかはわかりません. reduceタスクごとに最初にシャッフルされると思いますか?したがって、ファイルのサイズはそれほど大きくなりません。これをブラックボックスの観点から見ると、200 個の圧縮されたスピルから開始し、reduce タスク用に 100 個の圧縮ファイル (タスクごとに 1 つ) になることを意味します。

レデューサーは 60 個しかないため、ノードごとに60 個の圧縮ファイルが reducersにコピーされます。これはマップ フェーズで既に行われています。これはおそらく、圧縮ファイルがソースと宛先の両方に一時的に存在することを意味します。これは、この場合、メモリ要件が (一時的に) 1 ノードあたり 160 個の圧縮ファイルに増加することを意味します。これはマップ出力の 1.6 倍です。map 出力は 1800 GBなので、一時的ではありますが 2880 GB になります。したがって、最初の削減フェーズを開始できるはずです。コピー後 (願わくば!)データはマッパーのローカル出力ディレクトリから削除されるため、マップ出力と同じ量のデータが得られ、再び 1800 GB になります。

ここで、レデューサーのソート フェーズが開始されます。マッパーの記憶が消える前に始まらないといいのですが!? 1800 のマップ タスクの出力をマージするため、解凍する必要があります。reduce タスクの入力は、およそ mapoutput / 100 = 18 GB の圧縮データです。ノードごとに 144 GB になるため、一度にすべてを解凍することはできません。また、ジョブがクラッシュしなかったため、解凍は少し賢く実行されます。map フェーズと同じように考えます。10 個のファイル (1800 個のタスク出力のうち) が同時に解凍され、マージされます。解凍により、マージ ラウンドごとに 18GB/180 = 100 MB のオーバーヘッドが発生します。ここでも問題は、最後のマージ ラウンドがどのように発生するかです。、hadoop リファレンスを読んだことを覚えています。ファイルが 1 つだけになるまでレデューサーはマージしません。

reduce フェーズでの並べ替えの後、reduce フェーズが実行され、入力レコードの解凍が必要になりますが、すべての reduce タスクは 500 個の入力キー グループで動作するため、これは実際の問題ではありません。

前述のように、reduce タスクは DFS に対して約 5 GB の出力を生成します (合計で 0.5 TB)。

最初の 60 個の reduce タスクが終了すると、ジョブは本当に問題になります。2 番目のラウンドでは、並べ替えフェーズでタスクがクラッシュし始めます。これは、コピーのオーバーヘッドまたは圧縮解除のオーバーヘッドに関係していると思われます。

正確な例外は次のとおりです。org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for attempt_201310160819_0001_r_000068_1/intermediate.3

プログラム フローと mapreduce の理解を十分に詳しく説明できたことを願っています。私は本当に感謝しています:

  • 誰かがコピー段階とマージ段階に関する煙を片付けることができます
  • 仕事のクラッシュを克服するための提案を提供するだけでなく、.
  • 必要なメモリ量を正確に見積もることができれば理想的です。なぜなら、40 ノードのクラスタを 5 日間の運用後に (今回経験したように) クラッシュさせようとすると不快になるからです。近づいています。

前もって感謝します

私のジョブ失敗のスタックトレースは次のとおりです。

例外 1:

org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for attempt_201310160819_0001_r_000068_1/intermediate.3
    at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:381)
    at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146)
    at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:127)
    at org.apache.hadoop.mapred.Merger$MergeQueue.merge(Merger.java:510)
    at org.apache.hadoop.mapred.Merger.merge(Merger.java:142)
    at org.apache.hadoop.mapred.ReduceTask$ReduceCopier.createKVIterator(ReduceTask.java:2539)
    at org.apache.hadoop.mapred.ReduceTask$ReduceCopier.access$400(ReduceTask.java:661)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:399)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:416)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)

例外 2:

FAILEDjava.io.IOException: Task: attempt_201310160819_0001_r_000075_1 - The reduce copier failed
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:390)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:416)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)
Caused by: org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for output/map_1622.out
    at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:381)
    at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146)
    at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:127)
    at org.apache.hadoop.mapred.MapOutputFile.getInputFileForWrite(MapOutputFile.java:176)
    at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.doInMemMerge(ReduceTask.java:2798)
    at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.run(ReduceTask.java:2762)

例外 3: (おそらく diskchecker 例外が原因)

Task attempt_201310160819_0001_r_000077_1 failed to report status for 2400 seconds. Killing!
4

1 に答える 1