0.18 バージョンのドキュメントを使用していることに気付きました。ここに 1.0.2 (最新) へのリンクがあります。
最初のアドバイス - IDE (Eclipse、IDEA など) を使用してください。空欄を埋めるのにとても役立ちます。
実際の HDFS では、ファイルの各部分がどこにあるのか (異なるマシンとクラスター) を知ることはできません。行 X が行 Y と同じディスク上に存在するという保証さえありません。また、行 X が異なるマシン間で分割されないという保証もありません (HDFS はデータをブロック単位で分散し、通常はそれぞれ 64Mb です)。これは、同じマッパーがファイル全体を処理すると想定できないことを意味します。確認できることは、各ファイルが同じ reducer によって処理されることです。
リデューサーはマッパーから送信されたキーごとに一意であるため、これを行う方法は、ファイル名をマッパーの出力キーとして使用することです。さらに、マッパーのデフォルトの入力クラスは ですTextInputFormat
。これは、各マッパーが独自に行全体を受け取ることを意味します (LF または CR で終了)。次に、マッパーからファイル名と数字の 1 (またはその他、計算には関係ありません) を出力できます。次に、レデューサーで、ループを使用して、ファイル名が受信された回数をカウントします。
マッパーの map 関数で
public static class Map extends Mapper<IntWritable, Text, Text, Text> {
public void map(IntWritable key, Text value, Context context) {
// get the filename
InputSplit split = context.getInputSplit();
String fileName = split.getPath().getName();
// send the filename to the reducer, the value
// has no meaning (I just put "1" to have something)
context.write( new Text(fileName), new Text("1") );
}
}
レデューサーのreduce関数で
public static class Reduce extends Reducer<Text, Text, Text, Text> {
public void reduce(Text fileName, Iterator<Text> values, Context context) {
long rowcount = 0;
// values get one entry for each row, so the actual value doesn't matter
// (you can also get the size, I'm just lazy here)
for (Text val : values) {
rowCount += 1;
}
// fileName is the Text key received (no need to create a new object)
context.write( fileName, new Text( String.valueOf( rowCount ) ) );
}
}
ドライバー/メインで
wordcount の例とほぼ同じドライバーを使用できます。新しい mapreduce API を使用したことに注意してJob
くださいJobConf
。読んでいてとても参考になりました。
MR 出力は、各ファイル名とその行数だけになることに注意してください。
input1.txt 3
input2.txt 4
input3.txt 9
すべてのファイルの TOTAL 行数を数えたい場合は、すべてのマッパーで同じキーを発行するだけです (ファイル名ではありません)。このようにして、すべての行カウントを処理するレデューサーが 1 つだけになります。
// no need for filename
context.write( new Text("blah"), new Text("1") );
また、ファイルごとの行数の出力を処理するジョブをチェーンしたり、その他の凝った処理を行ったりすることもできます。それはあなた次第です。
定型コードをいくつか省略しましたが、基本はそこにあります。私はこれのほとんどをメモリから入力していたので、必ず私をチェックしてください.. :)
お役に立てれば!