Chris Smith がこの質問に答えて、SO に投稿できると言いました。彼の答え:
したがって、入力データのサイズ自体は EMR の制限ではありません。他にもたくさんの要因があります。
とはいえ、10 TB のデータを吸い込むのは骨の折れる作業です。それだけの量のデータを読み取るだけでも非常に残忍であり、バケット化/ソートが行われます。
最初の質問は、制約要因は何ですか? ネットワーク帯域幅が限界に達していませんか? CPUが限界に達していませんか?ディスク I/O か iops か? これらはデータノードでどのように見えますか? JobTracker と NameNodes についてはどうですか (クラスターの残りの部分に問題がないのに、これらを最大にするのは珍しいことではありません)。上記のいずれでもない場合は、Hadoop リソースが限界に達している可能性があり、別の構成が必要です。
どの段階にあるかを超えた競合の特定の側面について言及していないので、その下で何が起こっているかについての測定基準をあまり持っていないのではないかと思います. 通常、大きなジョブを適切に調整する前に、「測定してから調整する」を数回繰り返す必要があります。
一般的な経験則として、「リデュース/コピー」フェーズに長時間拘束されることは、「やり方が間違っている」ことを示すかなり強力な指標です。通常、問題は、ノードが何らかの方法でディスク IO を最大化して、ソート/スピル/マージ プロセスに巻き込まれていることです。Hadoop には多数のチューニング パラメーターがあり、多数のマッパーとリデューサーを使用するジョブでは、特に 2 つの間に大きな不均衡がある場合に、おかしくなり始めます。ここでも Karmasphere や同様のツールが大いに役立ちます。微調整が必要な典型的なもの (名前の一部が間違っている可能性があります):
ロギング。特に、dfs.namenode.logging.level のようなものは、ジョブの前に微調整することが重要になる場合があります。詳細ログで自分を殺すことは完全に可能です。逆説的ですが、それがあなたの救いにもなるので…
マップの出力サイズは通常、「縮小/コピー」の問題の重要な要素です。可能であれば、マップの出力サイズを小さくする方法を検討してください。本当に_マップの入力サイズよりもはるかに小さくする必要があります。reduce フェーズで厳密に必要とされないデータを取り除きます。プロトコル バッファや倹約 (整数データの大勝利) のようなコンパクトなバイナリ シリアライゼーション形式 (Java シリアライゼーションはパフォーマンスを低下させます) の使用を検討してください。ID/列挙型で文字列をどの程度表現できるかを検討してください。コンバイナーを使用して、ネットワーク経由で送信する必要があるデータの量を削減できますか? CPU に余裕がある場合は、圧縮を使用します (lzo または snappy から始めますが、まだ CPU を燃やす必要がある場合は、gzip またはさらに強力なものを検討してください)。マップ タスク ログでまだ結合ステップに時間がかかっていることがわかる場合は、微調整を行う必要があります。
io.sort.factor: おそらくもっと高いはずです。何をしているかによっては、マッパーが多すぎることに悩まされることさえあるかもしれません。io.sort.mb: io.sort.factor と密接に関連していますが、異なります。ノードで多くのディスク I/O ストレスが発生し始めたら、これを上げます。これはメモリを大量に消費するため、このパラメータには実際のトレードオフがあります。
mapred.job.reuse.jvm.num.tasks: タスクが本当に小さくなった場合のみ、しかしそうであれば、これを押し上げる価値がありますこの数を増やすために。バランスをとるために、おそらく他の数値を微調整する必要があるでしょう。
io.sort.record.percent: これは、ジョブのサイズが原因で完全に的外れになる可能性が最も低いものです。通常、これが間違っている場合は、レコードが非常に大きいか、または非常に小さいためです。目指す黄金比は「16/(16+1レコードあたりのバイト数)」です。
早期のスピルがノードのパフォーマンスに与える影響を強調することは困難です。こぼした場合、それはデータが書き出され、再度読み取られ、再度書き出されることを意味します。各ノードで。したがって、これを間違えると、ノードを追加しても役に立ちません (実際には悪化する可能性があります)。ジョブでスピルされたレコード数と出力されたマップ レコード数を比較したいとします。理想的には、これらの数値は同じになります。現在、スピルする必要がある場合は、スピルする必要があります (ただし、これは多くの場合、何か間違ったことをしている兆候です) が、レコードごとに 1 回だけディスクにスピルするジョブは、他のジョブを押しつぶすだけです。
レデューサー側でも同様の問題が発生する可能性があります。マージ フェーズのカウンターを見てください。理想的には、こぼれたレコードを 0 にするか、少なくとも <= レデューサー入力レコードの数にする必要があります。それが高い場合...それがパフォーマンスの問題を抱えている理由です(真剣に、これは絶対に残忍です)。さまざまなリデューサー スピル設定に注意してください: mapred.job.shuffle.input.buffer.percent、mapred.job.shuffle.merge.percent、mapred.inmem.merge.threshold、io.sort.factor。通常、大きなジョブで失敗するのは mapred.inmem.merge.threshold です。最初の 2 つもよく失敗しますが、それは仕事の規模というよりも、仕事の性質の関数として起こります。
dfs.namenode.handler.count: HDFS で多数の小さなファイルを生成している場合は、間違いなくこれをプッシュする必要があります
dfs.mapred.job.tracker.handler.count: これを高くする必要がある場合は、アイデアを得るために必要なタスクの数を見てください。数百のノードで実行される数千の小さなタスクを作成している場合、これが 10 であることには満足できません。
dfs.datanode.handler.count: これは、parallel.copies フラグと連動します。私の最初の本能はそれを非常に高くランプアップすることであり、それから私は他の場所でログジャムを作成するだけなので、これはいつも私を困らせます. ;-) とにかく、何人のマッパーが何人のレデューサーと話しているかを考えると、これをかなりブーストすることはおそらく理にかなっています。
tasktracker.http.threads: これは、reduce-copy でスタックしている場合に問題になる可能性は低くなります。とにかくあるべき場所に近づいています。mapred.local.dir: これは、大規模なマップ出力を伴うジョブのために非 EMR クラスターで頻繁に微調整しなければならなかったものです。実際にディスクとディスク容量が制限される可能性があるため、パスをドライブごとに 1 つずつカンマで区切られたディレクトリのリストに変更すると役立つことがわかりました。もちろん、EMR の場合は意味がありませんが、ディスク容量が実際にすぐに不足する可能性があることを示しています。
mapred.local.dir.minspacestart: 気付いていないかもしれませんが、マップ出力用のスペースが不足している可能性があります。この値を調整して、ジョブを開始する前に各タスクがシステム上に十分なスペースを確保できるようにすると、ベーコンを節約できます。
Hadoop は実際には、スピンドルあたり 2 コアのシステム用に設計されており (これはムーアの法則の数回前の繰り返しでした)、すべての入力と出力が HDFS 内に留まり (これにより、入力と出力のショートカットが大量に可能になります)、1GigE 8 コアあたりのポートであり、スイッチ ファブリックのボトルネックはほとんどありません。EMR ではそのようなことは何もありません。Amazon はそれを調整するためにいくつかの適切なデフォルトを提供しようとしていますが、すべての人にとって一般的に問題を解決することは困難です. EMR の 1 つの利点は、ノードごとに大量の RAM を取得する傾向があることです。そのため、ディスク I/O を最小限に抑えるために、RAM を最適に使用するように時間を費やす必要があります。Hadoop は、マッパーが大量の生データを消費するが、吐き出すデータが比較的少ないジョブにも真価を発揮します。各ジョブで生成するすべてのデータに対して、大規模な分散ソートが行われます。Hadoop はデフォルトで、RAM とディスク領域の大部分をタスクに使用できるようにしながら、それを実行しようとします。データがすでにバケット化/ソートされていると、多くの作業がレデューサーからマッパーにプッシュされ、大量のオーバーヘッドが回避されます。おそらく、これがあなたの問題です。