0

次の形式で hbase にログ データがあります。

hbase ソース テーブル

---------------------
date(table key) word count
---------------------
2013/09/25 apple 5
2013/09/25 mangoes 2
2013/09/25 oranges 6
2013/09/25 apple 2
2013/09/25 mangoes 3
2013/09/25 mangoes 1

destテーブル(2013/09/25にmapreduceを実行した後、宛先テーブルに単語をキーとして追加し、カウントの合計をcolumn.dataとして追加します)

------------------
word(table key) count
------------------
apple 7
oranges 6
mangoes 6

データは毎日ソース テーブルに追加されますが、すべてのソース テーブル データに対して map reduce を実行したくありません。ということで、当日追加したデータだけマップリデュースしてみました。

2013/09/26 に新しいデータが追加されたソース テーブル。

---------------------
date(table key) word count
---------------------
2013/09/25 apple 5
2013/09/25 mangoes 2
2013/09/25 oranges 6
2013/09/25 apple 2
2013/09/25 mangoes 3
2013/09/25 mangoes 1
2013/09/26 apple 10
2013/09/26 oranges 20

2013/09/26 データに対してのみ mapreduce を実行すると、dest テーブルに次のように表示されます。

新しいデータを含む dest テーブル (キーが同じであるため、リンゴとオレンジのカウントは 2013/09/26 データで更新されます。2013/09/25 までの古いデータはなくなりました):

------------------
word(table key) count
------------------
apple 10
oranges 10
mangoes 6 

予想される dest テーブル:

------------------
word(table key) count
------------------
apple 17
oranges 16
mangoes 6 

部分的なデータをマップして削減し、カウントを dest テーブルのカウント列に追加できますか、またはすべてのデータを毎回マップ削減する必要がありますか?

部分データを削減してカウントを更新できる場合、どうすればそれを行うことができますか。ここに私のマップ削減機能があります。

マップ機能:

public void map(ImmutableBytesWritable row,Result value,Context context) throws IOException {
    ImmutableBytesWritable key = new  ImmutableBytesWritable(row.get());
    String cf = "data";
    String column1 = "word";
    String column2 = "count";
    String word   = new String(result.getValue(Bytes.toBytes(cf),Bytes.toBytes(column1)));
    Text t = new Text(word);
    context.write(t,value); 

}

縮小機能:

public void reduce(Text key,Iterable<Result> values,Context context) throws IOException,InterruptedException {
    int count=0;
    String cf = "data";
    String column = "count";
    for(Result val :values) {
        int d = Integer.parseInt(new String(result.getValue(Bytes.toBytes(cf),Bytes.toBytes(column))))
        count += d;
    }
    Put put = new Put(Bytes.toBytes(key.toString()));
    put.add(cf.getBytes(), column.getBytes(), String.valueOf(count).getBytes());
    context.write(null, put);
}    
4

2 に答える 2

1

HBase を使用すると、列をカウンターとして扱うことができます。IncrementまたはincrementColumnValueにできます。それに関する優れた機能は、各インクリメントがアトミックであるため、複数のソース (マップ) から同時にインクリメントでき、合計が正確になることです。

それをマップ (または削減) で使用するには、コンテキストではなく、自分で HBase に書き込む必要があります。setup メソッドでテーブルを開き、クリーンアップで閉じる (または合計をインクリメントする) ことができます。

于 2013-09-27T13:04:21.980 に答える
0

データはどこにもありません。同じセルにデータを入れているので、新しいバージョンになります。テーブルをスキャンすると、デフォルトで最新バージョンのみが表示されます。新しいカウントを前のカウントに追加し、最終的な値をテーブルに挿入するロジックを記述する必要があります。

複数のバージョンを保持したくない場合は、最終的なカウントをテーブルに入れる前に、古いバージョンを削除して自分で処理する必要があります。

于 2013-09-27T13:01:32.793 に答える