8

出力を HBase に書き込む必要がある Haddop ジョブがあります。レデューサーはあまり必要ありません。挿入する行の種類はマッパーで決定されます。

これを達成するために TableOutputFormat を使用するにはどうすればよいですか? 私が見たすべての例から、Reducer は Put を作成するものであり、TableMapper は HBase テーブルから読み取るためのものであるという前提がありました。

私の場合、入力は HDFS であり、出力は特定のテーブルに置かれます。TableMapReduceUtil には、それを助けるものは何も見つかりません。

それを助けることができる例はありますか?

ところで、私は新しい Hadoop API を使用しています

4

2 に答える 2

7

これは、ファイルから読み取り、すべての行を Hbase に入れる例です。この例は「Hbase: The definitive guide」からのもので、リポジトリで見つけることができます。取得するには、コンピューターでレポを複製するだけです。

git clone git://github.com/larsgeorge/hbase-book.git

この本では、コードに関するすべての説明も見つけることができます。でも、わからないことがあれば遠慮なく聞いてください。

`    public class ImportFromFile {
     public static final String NAME = "ImportFromFile"; 
     public enum Counters { LINES }

     static class ImportMapper
     extends Mapper<LongWritable, Text, ImmutableBytesWritable, Writable> { 
       private byte[] family = null;
       private byte[] qualifier = null;

       @Override
       protected void setup(Context context)
         throws IOException, InterruptedException {
         String column = context.getConfiguration().get("conf.column");
         byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column));
         family = colkey[0];
         if (colkey.length > 1) {
           qualifier = colkey[1];
         }
       }

       @Override
       public void map(LongWritable offset, Text line, Context context) 
       throws IOException {
          try {
           String lineString = line.toString();
           byte[] rowkey = DigestUtils.md5(lineString); 
           Put put = new Put(rowkey);
           put.add(family, qualifier, Bytes.toBytes(lineString)); 
           context.write(new ImmutableBytesWritable(rowkey), put);
           context.getCounter(Counters.LINES).increment(1);
         } catch (Exception e) {
           e.printStackTrace();
         }
       }
     }

     private static CommandLine parseArgs(String[] args) throws ParseException { 
       Options options = new Options();
       Option o = new Option("t", "table", true,
         "table to import into (must exist)");
       o.setArgName("table-name");
       o.setRequired(true);
       options.addOption(o);
       o = new Option("c", "column", true,
         "column to store row data into (must exist)");
       o.setArgName("family:qualifier");
       o.setRequired(true);
       options.addOption(o);
       o = new Option("i", "input", true,
         "the directory or file to read from");
       o.setArgName("path-in-HDFS");
       o.setRequired(true);
       options.addOption(o);
       options.addOption("d", "debug", false, "switch on DEBUG log level");
       CommandLineParser parser = new PosixParser();
       CommandLine cmd = null;
       try {
         cmd = parser.parse(options, args);
       } catch (Exception e) {
         System.err.println("ERROR: " + e.getMessage() + "\n");
         HelpFormatter formatter = new HelpFormatter();
         formatter.printHelp(NAME + " ", options, true);
         System.exit(-1);
       }
       return cmd;
     }

     public static void main(String[] args) throws Exception {
       Configuration conf = HBaseConfiguration.create();
       String[] otherArgs =
         new GenericOptionsParser(conf, args).getRemainingArgs(); 
       CommandLine cmd = parseArgs(otherArgs);
       String table = cmd.getOptionValue("t");
       String input = cmd.getOptionValue("i");
       String column = cmd.getOptionValue("c");
       conf.set("conf.column", column);
       Job job = new Job(conf, "Import from file " + input + " into table " + table); 

            job.setJarByClass(ImportFromFile.class);
       job.setMapperClass(ImportMapper.class);
       job.setOutputFormatClass(TableOutputFormat.class);
       job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
       job.setOutputKeyClass(ImmutableBytesWritable.class);
       job.setOutputValueClass(Writable.class);
       job.setNumReduceTasks(0); 
       FileInputFormat.addInputPath(job, new Path(input));
       System.exit(job.waitForCompletion(true) ? 0 : 1);
     }
    }`
于 2012-06-16T10:08:06.310 に答える
1

マッパーにペアを出力させる必要があります。OutputFormat出力キー値を永続化する方法のみを指定します。キー値がレデューサーからのものであるとは限りません。マッパーで次のようなことを行う必要があります。

... extends TableMapper<ImmutableBytesWritable, Put>() {
    ...
    ...
    context.write(<some key>, <some Put or Delete object>);
}
于 2012-06-16T08:25:55.950 に答える