0

レデューサーを使用せずに hbase を読み書きしたい。

「The Apache HBase™ Reference Guide」の例に従いましたが、例外があります。

これが私のコードです:

public class CreateHbaseIndex { 
static final String SRCTABLENAME="sourceTable";
static final String SRCCOLFAMILY="info";
static final String SRCCOL1="name";
static final String SRCCOL2="email";
static final String SRCCOL3="power";

static final String DSTTABLENAME="dstTable";
static final String DSTCOLNAME="index";
static final String DSTCOL1="key";
public static void main(String[] args) {
    System.out.println("CreateHbaseIndex Program starts!...");
    try {
        Configuration config = HBaseConfiguration.create();
        Scan scan = new Scan();
        scan.setCaching(500);
        scan.setCacheBlocks(false);
        scan.addColumn(Bytes.toBytes(SRCCOLFAMILY), Bytes.toBytes(SRCCOL1));//info:name
        HBaseAdmin admin = new HBaseAdmin(config);
        if (admin.tableExists(DSTTABLENAME)) {
            System.out.println("table Exists.");
        }
        else{
            HTableDescriptor tableDesc = new HTableDescriptor(DSTTABLENAME);
            tableDesc.addFamily(new HColumnDescriptor(DSTCOLNAME));
            admin.createTable(tableDesc);
            System.out.println("create table ok.");
        }
        Job job = new Job(config, "CreateHbaseIndex");
        job.setJarByClass(CreateHbaseIndex.class);
        TableMapReduceUtil.initTableMapperJob(
                SRCTABLENAME, // input HBase table name
                scan, // Scan instance to control CF and attribute selection
                HbaseMapper.class, // mapper
                ImmutableBytesWritable.class, // mapper output key
                Put.class, // mapper output value
                job);
        job.waitForCompletion(true);
    } catch (IOException e) {
        e.printStackTrace();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ClassNotFoundException e) {
        e.printStackTrace();
    }
    System.out.println("Program ends!...");
}

public static class HbaseMapper extends TableMapper<ImmutableBytesWritable, Put> {
    private HTable dstHt;
    private Configuration dstConfig;
    @Override
    public void setup(Context context) throws IOException{
        dstConfig=HBaseConfiguration.create();
        dstHt = new HTable(dstConfig,SRCTABLENAME);
    }

    @Override
    public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
        // this is just copying the data from the source table...
        context.write(row, resultToPut(row,value));
    }

    private static Put resultToPut(ImmutableBytesWritable key, Result result) throws IOException {
        Put put = new Put(key.get());
        for (KeyValue kv : result.raw()) {
            put.add(kv);
        }
        return put;
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        dstHt.close();
        super.cleanup(context);
    }
}
}

ちなみに、「sourceTable」はこんな感じです。

key  name    email
1    peter   a@a.com
2    sam     b@b.com

「dstTable」は次のようになります。

key    value
peter  1
sam    2

私はこの分野の初心者であり、あなたの助けが必要です. もぅ~

4

2 に答える 2

0

HBaseに書き込むためにレデューサーは必要ないというのは正しいですが、レデューサーが役立つ場合があります。インデックスを作成している場合、2つのマッパーが同じ行を書き込もうとしている状況に遭遇する可能性があります。それらが異なる列修飾子に書き込んでいることを確認するように注意しない限り、競合状態のために1つの更新を別の更新で上書きする可能性があります。HBaseは行レベルのロックを実行しますが、アプリケーションロジックに障害がある場合は役に立ちません。

例外が表示されない場合は、ソーステーブルから列ファミリーが存在しないインデックステーブルにキーと値のペアを書き込もうとしているため、失敗していると思います。

于 2012-11-19T16:37:31.867 に答える
0

このコードでは、出力形式を指定していません。次のコードを追加する必要があります

    job.setOutputFormatClass(TableOutputFormat.class);

    job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE,
            DSTTABLENAME);

また、セットアップで新しい構成を作成することは想定されていません。コンテキストから同じ構成を使用する必要があります。

于 2013-11-18T13:24:37.153 に答える