2

MapReduceフレームワークを使用してJavaでHadoopアプリケーションを作成しています。

入力と出力の両方にテキストキーと値のみを使用します。コンピュターを使用して、最終出力に還元する前に追加の計算ステップを実行します。

しかし、キーが同じレデューサーに移動しないという問題があります。コンバイナーで次のようなキーと値のペアを作成して追加します。

public static class Step4Combiner extends Reducer<Text,Text,Text,Text> {
    private static Text key0 = new Text();
    private static Text key1 = new Text();

        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                key0.set("KeyOne");
                key1.set("KeyTwo");
                context.write(key0, new Text("some value"));
                context.write(key1, new Text("some other value"));
        }

}   

public static class Step4Reducer extends Reducer<Text,Text,Text,Text> {

            public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                System.out.print("Key:" + key.toString() + " Value: ");
                String theOutput = "";
                for (Text val : values) {
                    System.out.print("," + val);
                }
                System.out.print("\n");

                context.write(key, new Text(theOutput));
            }

}

メインでは、私は次のようなジョブを作成します:

Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

Job job4 = new Job(conf, "Step 4");
job4.setJarByClass(Step4.class);

job4.setMapperClass(Step4.Step4Mapper.class);
job4.setCombinerClass(Step4.Step4Combiner.class);
job4.setReducerClass(Step4.Step4Reducer.class);

job4.setInputFormatClass(TextInputFormat.class);
job4.setOutputKeyClass(Text.class);
job4.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job4, new Path(outputPath));
FileOutputFormat.setOutputPath(job4, new Path(finalOutputPath));            

System.exit(job4.waitForCompletion(true) ? 0 : 1);

レデューサーから出力されるstdoutの出力は次のとおりです。

Key:KeyOne Value: ,some value
Key:KeyTwo Value: ,some other value
Key:KeyOne Value: ,some value
Key:KeyTwo Value: ,some other value
Key:KeyOne Value: ,some value
Key:KeyTwo Value: ,some other value

キーが同じであるため、これは意味がありません。したがって、Iterableに同じ値が3つある2つのレデューサーである必要があります。

あなたが私がこれの底に到達するのを手伝ってくれることを願っています:)

4

2 に答える 2

4

これはおそらく、コンバイナーがマップフェーズとリデュースフェーズの両方で実行されているためです(あまり知られていない「機能」)。

基本的に、コンバイナーでキーを修正します。これは、マップ出力がレデューサーでマージされるときに実行される場合と実行されない場合があります。コンバイナーが実行された後(reduce側)、キーはグループ化コンパレーターを介して供給され、Iterableがreduceメソッドに渡された値を決定します(ここではreduceフェーズのストリーミングの側面を回避しています-iterableはバックアップされていません値のセットまたはリストによって、グループ化コンパレータが現在のキーと最後のキーが同じであると判断した場合、iterator()。next()への呼び出しがさらにtrueを返します)

コンテキストを調べることで、現在のコンバイナーフェーズ側(マップまたはリデュース)を検出することができます(Context.getTaskAttempt().isMap()方法はありますが、これにも問題があるという記憶があり、これに関するJIRAチケットがどこかにある可能性もあります)。

結論として、コンバイナーがリデュース側を実行している場合にこの動作をバイパスすることができない限り、コンバイナーのキーを修正しないでください。

編集@Amarのコメントを調査して、いくつかの冗長なコンパレータ、コンバイナ、レデューサーなどを追加するコード( pastebin link )を まとめました。単一のマップジョブを実行すると、reduceフェーズではコンバイナは実行されず、マップ出力は実行されません。すでにソートされていると想定されているため、再度ソートされます。

コンバイナクラスに送信される前にソートされるため、ソートされていると想定され、キーはそのままで出力されると想定されているため、引き続きソートされます。コンバイナは、特定のキーの値を結合することを目的としていることを忘れないでください。

したがって、単一のマップと指定されたコンバイナーを使用すると、レデューサーはKeyOne、KeyTwo、KeyOne、KeyTwo、KeyOneの順序でキーを確認します。グループ化コンパレータはそれらの間の遷移を確認するため、reduce関数を6回呼び出します。

2つのマッパーを使用する場合、レデューサーは2つの並べ替えられたセグメント(各マップから1つ)があることを認識しているため、削減する前にそれらを並べ替える必要があります-ただし、セグメントの数がしきい値を下回っているため、並べ替えは次のように行われます。インラインストリームソート(ここでも、セグメントはソートされていると見なされます)。2つのマッパー(reduceフェーズから出力された10レコード)では、まだ間違った出力になります。

繰り返しになりますが、コンバイナーのキーを修正しないでください。これは、コンバイナーの目的ではありません。

于 2013-01-02T21:04:32.747 に答える
0

代わりに、コンバイナーでこれを試してください。

context.write(new Text("KeyOne"), new Text("some value"));
context.write(new Text("KeyTwo"), new Text("some other value"));

そのようなことが起こっているのを私が見る唯一の方法は、key0あるコンバイナーからのコンバイナーが別のコンバイナーからのコンバイナーと等しいことが見つからない場合key0です。キーがまったく同じインスタンスを指している場合にどのように動作するかはわかりません(これは、キーを静的にした場合に発生します)。

于 2013-01-02T19:55:45.493 に答える