3

2 つの Map reduce ジョブをチェーンしました。Job1 にはレデューサーが 1 つしかなく、浮動小数点値を計算しています。この値を Job2 のレデューサーで使用したいと考えています。これが私の主なメソッド設定です。

public static String GlobalVriable;
public static void main(String[] args) throws Exception {

        int runs = 0;
        for (; runs < 10; runs++) {
            String inputPath = "part-r-000" + nf.format(runs);
            String outputPath = "part-r-000" + nf.format(runs + 1);
            MyProgram.MR1(inputPath);
            MyProgram.MR2(inputPath, outputPath);
        }
    }

    public static void MR1(String inputPath)
            throws IOException, InterruptedException, ClassNotFoundException {

        Configuration conf = new Configuration();
        conf.set("var1","");
        Job job = new Job(conf, "This is job1");
        job.setJarByClass(MyProgram.class);
        job.setMapperClass(MyMapper1.class);
        job.setReducerClass(MyReduce1.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FloatWritable.class);
        FileInputFormat.addInputPath(job, new Path(inputPath));
        job.waitForCompletion(true);
        GlobalVriable = conf.get("var1"); // I am getting NULL here
    }

    public static void MR2(String inputPath, String outputPath)
            throws IOException, InterruptedException, ClassNotFoundException {

        Configuration conf = new Configuration();
        Job job = new Job(conf, "This is job2");
        ...
    }

    public static class MyReduce1 extends
        Reducer<Text, FloatWritable, Text, FloatWritable> {

    public void reduce(Text key, Iterable<FloatWritable> values, Context context)
            throws IOException, InterruptedException {

        float s = 0;
        for (FloatWritable val : values) {
            s += val.get();
        }

        String sum = Float.toString(s);
        context.getConfiguration().set("var1", sum);
    }
}

ご覧のとおり、プログラム全体を複数回繰り返す必要があります。私の Job1 は、入力から単一の数値を計算しています。それは単一の数値であり、多くの反復であるため、HDFS に書き込んで読み取りたくはありません。Myreducer1 で計算された値を共有して Myreducer2 で使用する方法はありますか。

更新: conf.set & conf.get を使用して値を渡そうとしました。値が渡されていません。

4

3 に答える 3

5

カウンターを介して浮動小数点値を返す方法は次のとおりです...

最初に、最初のレデューサーで、1000 を掛けて (たとえば、3 桁の精度を維持するために) float 値を long に変換し、結果をカウンターに入れます。

public void cleanup(Context context) {

    long result = (long) (floatValue * 1000);
    context.getCounter("Result","Result").increment(result); 

}

ドライバー クラスで、long 値を取得し、それを float に変換します。

public static void MR1(String inputPath)
        throws IOException, InterruptedException, ClassNotFoundException {

    Configuration conf = new Configuration();
    Job job = new Job(conf, "This is job1");
    job.setJarByClass(MyProgram.class);
    job.setMapperClass(MyMapper1.class);
    job.setReducerClass(MyReduce1.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(FloatWritable.class);
    FileInputFormat.addInputPath(job, new Path(inputPath));
    job.waitForCompletion(true);

    long result = job.getCounters().findCounter("Result","Result").getValue();
    float value = ((float)result) / 1000;

}
于 2012-12-01T21:24:11.330 に答える
1

これにはZooKeeperを使用できます。このようなジョブ間の調整やメッセージ パッシングに最適です。

于 2012-12-01T01:20:18.973 に答える
0

戻り値の型をMR1to int(または適切なデータ型) に変更して、計算した数値を返すことはできませんか?

    int myNumber = MyProgram.MR1(inputPath);

次に、パラメーターを追加MR2して、計算された数値で呼び出します。

    MyProgram.MR2(inputPath, outputPath, myNumber);
于 2012-12-01T00:27:00.360 に答える