指定されたコンバイナーで MapReduce ジョブを実行する場合、コンバイナーはソート段階で実行されますか? スピルごとにコンバイナーがマッパー出力で実行されることは理解していますが、マージソートの中間ステップで実行することも有益であるように思われます。ここでは、ソートのいくつかの段階で、同等のキーのマッパー出力がある時点でメモリに保持されていると想定しています。
これが現在発生していない場合、特定の理由があるのでしょうか、それとも実装されていないだけですか?
前もって感謝します!
コンバイナーは、ネットワーク帯域幅を節約するためにあります。
mapoutput は直接ソートされます。
sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);
これは、実際のマッピングが完了した直後に発生します。バッファの反復中に、コンバイナが設定されているかどうかを確認し、設定されている場合はレコードを結合します。そうでない場合は、ディスクに直接スピルします。
重要な部分は にありMapTask
ますので、自分の目で確かめてください。
sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);
// some fields
for (int i = 0; i < partitions; ++i) {
// check if configured
if (combinerRunner == null) {
// spill directly
} else {
combinerRunner.combine(kvIter, combineCollector);
}
}
これは、出力を転送する必要がある可能性が非常に高いため、ディスク容量とネットワーク帯域幅を節約するための適切な段階です。マージ/シャッフル/ソート フェーズでは、マップの終了時に実行されるコンバイナーと比較して、より多くの量のデータをクランチする必要があるため、有益ではありません。
Web インターフェイスに表示されるソート フェーズは誤解を招くことに注意してください。それは単なる純粋なマージです。
コンバイナーを実行する機会は 2 つあります。どちらも処理のマップ側にあります。(非常に優れたオンライン リファレンスは、Tom White の「Hadoop: The Definitive Guide」からのものです - https://www.inkling.com/read/hadoop-definitive-guide-tom-white-3rd/chapter-6/shuffle-and-ソート)
最初の機会は、各パーティションのキーによるメモリ内の並べ替えが完了した後、並べ替えられたデータをディスクに書き込む前のマップ側にあります。この時点でコンバイナーを実行する動機は、最終的にローカル ストレージに書き込まれるデータの量を減らすことです。ここでコンバイナーを実行することで、次のステップでマージおよびソートする必要があるデータの量も削減できます。したがって、投稿された元の質問に対して、はい、コンバイナーはこの初期段階で既に適用されています。
2 番目の機会は、スピル ファイルをマージして並べ替えた直後です。この場合、コンバイナーを実行する動機は、最終的にネットワーク経由でレデューサーに送信されるデータの量を減らすことです。この段階では、このステップで処理されるデータの量がすでに削減されている可能性がある、コンバイナーの以前のアプリケーションの恩恵を受けます。
コンバイナーは、あなたが理解している方法でのみ実行されます。
コンバイナーがこの方法でのみ機能する理由は、レデューサーに送信されるデータの量を減らすためだと思います。これは、多くの状況で大きな利益となります。一方、レデューサーでは、データは既にそこにあり、それらを並べ替え/マージで結合するか、削減ロジックで結合するかは、計算上はそれほど重要ではありません (現在または後で行われます)。
つまり、私のポイントは、マージで言うように組み合わせることで利益が得られる可能性があるということですが、マップ側のコンバイナーほどではありません。
私はコードを調べていませんが、Hadoop を参照してください: Tom White の第 3 版による決定的なガイドでは、コンバイナーが指定されている場合、レデューサーのマージ フェーズ中に実行されることが言及されています。以下は本文からの抜粋です。
" マップの出力が十分に小さい場合、reduce タスク JVM のメモリにコピーされます (バッファのサイズは、この目的に使用するヒープの割合を指定する mapred.job.shuffle.input.buffer.percent によって制御されます)。メモリ内バッファがしきい値サイズ (mapred.job.shuffle.merge.percent で制御) に達するか、マップ出力のしきい値数 (mapred.inmem.merge.threshold) に達すると、コンバイナが指定されている場合、ディスクに書き込まれるデータの量を減らすために、コンバイナがマージ中に実行されます。