0

私は10個のファイルと10個のファイルすべての合計である1つの大きなファイルを言うファイルのセットを持っています。

私はそれらを分散キャッシュ、jobconfに広告します。

私がreduceでそれらを読むとき、私は次のことを観察します:

  1. reduceメソッドの分散キャッシュに追加された選択したファイルのみを読み取ります。すべてのreduceメソッドで大きなファイルを読み取る場合と比較して、各reduceで読み取られるファイルサイズが小さいため、速度が速くなると予想しました。しかし、それは遅かった。

  2. また、それをさらに小さなファイルに分割して分散キャッシュに追加すると、問題はさらに悪化しました。ジョブ自体は、久しぶりに実行を開始しました。

理由がわかりません。plsは役立ちます。

4

1 に答える 1

3

あなたの問題は、reduce() でファイルを読み取ることにあると思います。configure() (古い API を使用) または setup() (新しい API を使用) でファイルを読み取る必要があります。したがって、すべてのレデューサーに対して、レデューサーへの入力グループごとに読み取るのではなく、1 回だけ読み取られます (基本的には、reduce メソッドの呼び出しごと)。

次のようなものを書くことができます: 新しい mapreduce API (org.apache.hadoop.mapreduce.*) を使用する -

    public static class ReduceJob extends Reducer<Text, Text, Text, Text> {

    ...
Path file1;
Path file2;
...

    @Override
            protected void setup(Context context) throws IOException, InterruptedException {

                // Get the file from distributed cached
    file1 = DistributedCache.getLocalCacheFiles(context.getConfiguration())[0];
    file2 = DistributedCache.getLocalCacheFiles(context.getConfiguration())[1];

                // parse the file and get it's data in-memory for use in reduce method, probably in some ArrayList or HashMap.
            }



            @Override
            protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException,
                    InterruptedException {
    ...
    }
    }

古い mapred API (org.apache.hadoop.mapred.*) を使用する -

public static class ReduceJob extends MapReduceBase implements Reducer<Text, Text, Text, Text> {

    ...
Path file1;
Path file2;
...

        @Override
        public void configure(JobConf job) {

                // Get the file from distributed cached
    file1 = DistributedCache.getLocalCacheFiles(job)[0]
    file2 = DistributedCache.getLocalCacheFiles(job)[1]
...

                // parse the file and get it's data in-memory for use in reduce method, probably in some ArrayList or HashMap.
            }


@Override
        public synchronized void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output,
                Reporter reporter) throws IOException {
    ...
    }
    }
于 2012-11-02T21:01:17.423 に答える