私の cassandra scehma には、列ファミリーに 1 つの行しかありません。mapreduce を実行すると、マッパーは同じ単一行を何度も読み取り続けます。したがって、マッパーは無限大になり、リデューサーは立ち往生します....
これらは使用される構成です
conf.set("fs.default.name", "hdfs://28.151.181.107:9000");
conf.set("mapred.job.tracker", "28.151.181.107:9001");
conf.setJar("C:\\hadoop-test\\demo\\target\\demo-0.0.1-SNAPSHOT.jar");
conf.setMapperClass(TokenizerMapper.class);
conf.setCombinerClass(ReducerToFilesystem.class);
conf.setReducerClass(ReducerToFilesystem.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(conf, new Path(resultFileName));
conf.setInputFormat(ColumnFamilyInputFormat.class);
ConfigHelper.setInputRpcPort(conf, PORT + "");
ConfigHelper.setInputInitialAddress(conf, HOST);
ConfigHelper.setInputPartitioner(conf, "RandomPartitioner");
ConfigHelper.setInputColumnFamily(conf, KEY_SPACE, COLUMN_FAMILY,true);
SlicePredicate predicate = new SlicePredicate();
SliceRange sliceRange = new SliceRange();
sliceRange.setStart(new byte[0]);
sliceRange.setFinish(new byte[0]);
predicate.setSlice_range(sliceRange);
ConfigHelper.setInputSlicePredicate(conf, predicate);
ConfigHelper.setOutputInitialAddress(conf, HOST);
ConfigHelper.setOutputPartitioner(conf, "RandomPartitioner");
Mapper & Reducer は
public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns,
OutputCollector<Text, Text> paramOutputCollector,
Reporter paramReporter) throws IOException {
DateSerializer sz = new DateSerializer();
StringSerializer s = new StringSerializer();
for (IColumn col : columns.values()) {
Date name = sz.fromByteBuffer(col.name());
double value = ByteBufferUtil.toDouble(col.value());
paramOutputCollector.collect(new Text(s.fromByteBuffer(key)),
new Text(name.toGMTString() + " [] [] " + value));
}
}
public static class ReducerToFilesystem implements
Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> paramOutputCollector,
Reporter paramReporter) throws IOException {
StringBuffer bfr = new StringBuffer();
while (values.hasNext()) {
Text val = values.next();
bfr.append(val);
bfr.append("<--->");
}
paramOutputCollector.collect(key, new Text(bfr.toString()));
}
ガイドをお願いします。
助けてくれてありがとう!