0

ファイル内のすべての行に対して一意のキーを生成するシナリオがあります。タイムスタンプ列がありますが、いくつかのシナリオで同じタイムスタンプに複数の行を使用できます。

以下のプログラムで説明されているように、一意の値にそれぞれのカウントが付加されたタイムスタンプを付けることにしました。

マッパーは、タイムスタンプをキーとして出力し、行全体をその値として出力し、リデューサーでキーが生成されます。

問題は Map が約 236 行を出力することです。そのうち 230 レコードのみが、同じ 230 レコードを出力するレデューサーの入力として供給されます。

public class UniqueKeyGenerator extends Configured implements Tool {

    private static final String SEPERATOR = "\t";
    private static final int TIME_INDEX = 10;
    private static final String COUNT_FORMAT_DIGITS = "%010d";

    public static class Map extends Mapper<LongWritable, Text, Text, Text> {

        @Override
        protected void map(LongWritable key, Text row, Context context)
                throws IOException, InterruptedException {
            String input = row.toString();
            String[] vals = input.split(SEPERATOR);
            if (vals != null && vals.length >= TIME_INDEX) {
                context.write(new Text(vals[TIME_INDEX - 1]), row);
            }
        }
    }

    public static class Reduce extends Reducer<Text, Text, NullWritable, Text> {

        @Override
        protected void reduce(Text eventTimeKey,
                Iterable<Text> timeGroupedRows, Context context)
                throws IOException, InterruptedException {
            int cnt = 1;
            final String eventTime = eventTimeKey.toString();
            for (Text val : timeGroupedRows) {
                final String res = SEPERATOR.concat(getDate(
                        Long.valueOf(eventTime)).concat(
                        String.format(COUNT_FORMAT_DIGITS, cnt)));
                val.append(res.getBytes(), 0, res.length());
                cnt++;
                context.write(NullWritable.get(), val);
            }
        }
    }

    public static String getDate(long time) {
        SimpleDateFormat utcSdf = new SimpleDateFormat("yyyyMMddhhmmss");
        utcSdf.setTimeZone(TimeZone.getTimeZone("America/Los_Angeles"));
        return utcSdf.format(new Date(time));
    }

    public int run(String[] args) throws Exception {
        conf(args);
        return 0;
    }

    public static void main(String[] args) throws Exception {
        conf(args);
    }

    private static void conf(String[] args) throws IOException,
            InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "uniquekeygen");
        job.setJarByClass(UniqueKeyGenerator.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        // job.setNumReduceTasks(400);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
    }

}

行数が多くても一貫しており、20855982 行の入力に対して 208969 レコードと同じくらい大きな違いがあります。レデューサーへの入力が減少した理由は何ですか?

4

1 に答える 1

0

データ損失の背後にある理由は、ブロックの 1 つで実行時例外が発生したため、そのブロックで利用可能なデータが完全に無視され、リデューサー入力が少なくなったためです。

ありがとう、サティシュ。

于 2013-07-01T03:40:13.057 に答える