mapreduce のデータフローを理解するのに少し苦労しています。最近、reduce フェーズでディスクのメモリが不足したときに、非常に要求の厳しいジョブがクラッシュしました。ジョブに必要なディスクの量を見積もることは難しいと思います。データフローについて詳しく説明します。
誰かが mapreduce のデータ フローを修正したり、詳細を説明したり、システムの寸法についてアドバイスをくれたりすると助かります。
クラスタ構成:
30 個のスレーブを含むクラスターがあります。
- 12GBのRAM
- 100GBのハードディスク
- 4コア
私のマップ タスクは wordcount に非常に似ているため、必要なメモリはほとんどありません。私の削減タスクは、単語の順列グループで機能します。同じ単語を結合する必要があるため、reduce 関数には常に <= 3GB の一時ハッシュ マップが必要です。
私は 12GB の RAM を持っており、私の Hadoop デーモンは 1GB のヒープ + オペレーティング システム用に 500MB を必要とするため、map/reduce スロットを次のように分割しました。
900MB ヒープの 4 つの map スロットと3GB ヒープの 2 つの reduce スロット。マップ スロットは 300 MB を超えるメモリを必要としないためio.sort.mb
、マップ フェーズでのメモリ内ソートを改善するために 500 MB に設定しました。
私のジョブには、それぞれ 8 GB のマップ出力を生成する 1800 のマップ タスクがあります。圧縮にBZIP2を使っているので、1GBまで圧縮できます。これは、3 TB のメモリがあるのに、合計マップ出力が 2 TB 未満になることを意味します。
それぞれ 5 GB の出力を生成する 100 個の削減タスクを選択しました。
一見すると、すべてがメモリに収まるはずです。しかし、明らかに、ソート段階では圧縮と解凍が必要であり、コピー段階ではデータが同時に2つの場所にある必要があります(私は推測します)。ここがややこしいところなので、データの流れを完全に理解したいと思います。これは私がそれが機能する方法だと思いますが、間違っている場合は修正してください:
データフロー
マップ タスクは多数(私の場合は 200)のスピルを生成し、メモリ内で並べ替えられ、ローカル ディスクに書き込まれる前に圧縮されます。マップ タスクが完了すると、10 個ごとにマージされる 200 個のスピル ファイルが得られます( io.sort.factor
)。これは、10 個のファイルが解凍されることを意味します: 10 x (5MB -> 40MB)、したがって、0.4GB の圧縮/解凍オーバーヘッドが発生します。. 200回の流出が最初のマージラウンドを行った後に何が起こるかはわかりません. reduceタスクごとに最初にシャッフルされると思いますか?したがって、ファイルのサイズはそれほど大きくなりません。これをブラックボックスの観点から見ると、200 個の圧縮されたスピルから開始し、reduce タスク用に 100 個の圧縮ファイル (タスクごとに 1 つ) になることを意味します。
レデューサーは 60 個しかないため、ノードごとに60 個の圧縮ファイルが reducersにコピーされます。これはマップ フェーズで既に行われています。これはおそらく、圧縮ファイルがソースと宛先の両方に一時的に存在することを意味します。これは、この場合、メモリ要件が (一時的に) 1 ノードあたり 160 個の圧縮ファイルに増加することを意味します。これはマップ出力の 1.6 倍です。map 出力は 1800 GBなので、一時的ではありますが 2880 GB になります。したがって、最初の削減フェーズを開始できるはずです。コピー後 (願わくば!)データはマッパーのローカル出力ディレクトリから削除されるため、マップ出力と同じ量のデータが得られ、再び 1800 GB になります。
ここで、レデューサーのソート フェーズが開始されます。マッパーの記憶が消える前に始まらないといいのですが!? 1800 のマップ タスクの出力をマージするため、解凍する必要があります。reduce タスクの入力は、およそ mapoutput / 100 = 18 GB の圧縮データです。ノードごとに 144 GB になるため、一度にすべてを解凍することはできません。また、ジョブがクラッシュしなかったため、解凍は少し賢く実行されます。map フェーズと同じように考えます。10 個のファイル (1800 個のタスク出力のうち) が同時に解凍され、マージされます。解凍により、マージ ラウンドごとに 18GB/180 = 100 MB のオーバーヘッドが発生します。ここでも問題は、最後のマージ ラウンドがどのように発生するかです。、hadoop リファレンスを読んだことを覚えています。ファイルが 1 つだけになるまでレデューサーはマージしません。
reduce フェーズでの並べ替えの後、reduce フェーズが実行され、入力レコードの解凍が必要になりますが、すべての reduce タスクは 500 個の入力キー グループで動作するため、これは実際の問題ではありません。
前述のように、reduce タスクは DFS に対して約 5 GB の出力を生成します (合計で 0.5 TB)。
最初の 60 個の reduce タスクが終了すると、ジョブは本当に問題になります。2 番目のラウンドでは、並べ替えフェーズでタスクがクラッシュし始めます。これは、コピーのオーバーヘッドまたは圧縮解除のオーバーヘッドに関係していると思われます。
正確な例外は次のとおりです。org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for attempt_201310160819_0001_r_000068_1/intermediate.3
プログラム フローと mapreduce の理解を十分に詳しく説明できたことを願っています。私は本当に感謝しています:
- 誰かがコピー段階とマージ段階に関する煙を片付けることができます
- 仕事のクラッシュを克服するための提案を提供するだけでなく、.
- 必要なメモリ量を正確に見積もることができれば理想的です。なぜなら、40 ノードのクラスタを 5 日間の運用後に (今回経験したように) クラッシュさせようとすると不快になるからです。近づいています。
前もって感謝します
私のジョブ失敗のスタックトレースは次のとおりです。
例外 1:
org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for attempt_201310160819_0001_r_000068_1/intermediate.3
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:381)
at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146)
at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:127)
at org.apache.hadoop.mapred.Merger$MergeQueue.merge(Merger.java:510)
at org.apache.hadoop.mapred.Merger.merge(Merger.java:142)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier.createKVIterator(ReduceTask.java:2539)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier.access$400(ReduceTask.java:661)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:399)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:416)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
例外 2:
FAILEDjava.io.IOException: Task: attempt_201310160819_0001_r_000075_1 - The reduce copier failed
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:390)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:416)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
Caused by: org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for output/map_1622.out
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:381)
at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146)
at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:127)
at org.apache.hadoop.mapred.MapOutputFile.getInputFileForWrite(MapOutputFile.java:176)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.doInMemMerge(ReduceTask.java:2798)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.run(ReduceTask.java:2762)
例外 3: (おそらく diskchecker 例外が原因)
Task attempt_201310160819_0001_r_000077_1 failed to report status for 2400 seconds. Killing!