次のように、マッパーが一連の値をレデューサーに渡すMapReduceアプリケーションを作成しようとしています。
Hello
World
HelloHelloWorld
こんにちは
_
_
ここで、これらの値を最初にグループ化してカウントし、次にさらに処理を実行します。私が書いたコードは次のとおりです。
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
List<String> records = new ArrayList<String>();
/* Collects all the records from the mapper into the list. */
for (Text value : values) {
records.add(value.toString());
}
/* Groups the values. */
Map<String, Integer> groupedData = groupAndCount(records);
Set<String> groupKeys = groupedData.keySet();
/* Writes the grouped data. */
for (String groupKey : groupKeys) {
System.out.println(groupKey + ": " + groupedData.get(groupKey));
context.write(NullWritable.get(), new Text(groupKey + groupedData.get(groupKey)));
}
}
public Map<String, Integer> groupAndCount(List<String> records) {
Map<String, Integer> groupedData = new HashMap<String, Integer>();
String currentRecord = "";
Collections.sort(records);
for (String record : records) {
System.out.println(record);
if (!currentRecord.equals(record)) {
currentRecord = record;
groupedData.put(currentRecord, 1);
} else {
int currentCount = groupedData.get(currentRecord);
groupedData.put(currentRecord, ++currentCount);
}
}
return groupedData;
}
しかし、出力では、すべてのカウントが1になります。sysoutステートメントは次のように出力されます。
Hello
World
Hello:1
World:1
Hello
Hello:1
Hello
World
Hello:1
World:1
Hi
Hi:1
問題が何であるか、そしてなぜすべてのレコードがレデューサーによって一度に受信されてgroupAndCount
メソッドに渡されないのか理解できません。