1 つのファイルがマップされている間に存在/動作し、次のファイルがマップされているときにリセット/再作成される HashSet を使用したいと考えています。ファイルが分割されず、マッパーによって全体として処理されるように、TextInputFormat を変更して isSplitable をオーバーライドして false を返すようにしました。このようなことは可能ですか?または、Accumulo テーブルへの書き込みを減らす別の方法はありますか?
グローバル変数が必要だとは思わないことから始めましょう。一意性を確保して、Accumulo テーブルに書き込むミューテーションを減らしたいだけです。
私のプロジェクトは、Accumulo で同じテーブルを作成しながら、線形 accumulo クライアント プログラムのシャード サンプルから Index.java ファイルの機能を mapreduce 機能を使用するものに変換することです。それはバズワードであるため、mapreduce である必要があり、本質的に、テラバイトのデータに対して線形プログラムよりも高速に実行されます。
参照用のインデックス コードは次のとおりです 。シャード/Index.java
このプログラムは、BatchWriter を使用してミューテーションを Accumulo に書き込み、ファイル単位で行います。必要以上のミューテーションを書き込まないようにし、一意性を確保するために (Accumulo は圧縮によって最終的に同じキーをマージすると思いますが)、Index.java には HashSet があり、単語が以前に実行されたかどうかを判断するために使用されます。これはすべて、理解するのが比較的簡単です。
マップのみの mapreduce ジョブへの移行はより複雑です。
これは私のマッピングの試みでした。これは、Accumulo テーブルで見た部分的な出力からはうまくいくように見えますが、線形プログラム Index.java に比べて実行速度が非常に遅くなります。
public static class MapClass extends Mapper<LongWritable,Text,Text,Mutation> {
private HashSet<String> tokensSeen = new HashSet<String>();
@Override
public void map(LongWritable key, Text value, Context output) throws IOException {
FileSplit fileSplit = (FileSplit)output.getInputSplit();
System.out.println("FilePath " + fileSplit.getPath().toString());
String filePath = fileSplit.getPath().toString();
filePath = filePath.replace("unprocessed", "processed");
String[] words = value.toString().split("\\W+");
for (String word : words) {
Mutation mutation = new Mutation(genPartition(filePath.hashCode() % 10));
word = word.toLowerCase();
if(!tokensSeen.contains(word)) {
tokensSeen.add(word);
mutation.put(new Text(word), new Text(filePath), new Value(new byte[0]));
}
try {
output.write(null, mutation);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
遅い問題は、ZooKeeper と Accumulo を上部に持つ Hadoop の単一ノード インスタンスであるテスト インスタンスでこれらすべてを実行しているという事実かもしれません。だとしたら、一意性の解決策を見つけるしかありません。
提供されたヘルプやアドバイスは大歓迎です。