0

私の cassandra scehma には、列ファミリーに 1 つの行しかありません。mapreduce を実行すると、マッパーは同じ単一行を何度も読み取り続けます。したがって、マッパーは無限大になり、リデューサーは立ち往生します....

これらは使用される構成です

conf.set("fs.default.name", "hdfs://28.151.181.107:9000");
    conf.set("mapred.job.tracker", "28.151.181.107:9001");
    conf.setJar("C:\\hadoop-test\\demo\\target\\demo-0.0.1-SNAPSHOT.jar");

    conf.setMapperClass(TokenizerMapper.class);
    conf.setCombinerClass(ReducerToFilesystem.class);
    conf.setReducerClass(ReducerToFilesystem.class);
    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(Text.class);
    conf.setMapOutputKeyClass(Text.class);
    conf.setMapOutputValueClass(Text.class);

    FileOutputFormat.setOutputPath(conf, new Path(resultFileName));

    conf.setInputFormat(ColumnFamilyInputFormat.class);

    ConfigHelper.setInputRpcPort(conf, PORT + "");
    ConfigHelper.setInputInitialAddress(conf, HOST);
    ConfigHelper.setInputPartitioner(conf, "RandomPartitioner");
    ConfigHelper.setInputColumnFamily(conf, KEY_SPACE, COLUMN_FAMILY,true);
    SlicePredicate predicate = new SlicePredicate();
    SliceRange sliceRange = new SliceRange();
    sliceRange.setStart(new byte[0]);
    sliceRange.setFinish(new byte[0]);
    predicate.setSlice_range(sliceRange);
    ConfigHelper.setInputSlicePredicate(conf, predicate);
    ConfigHelper.setOutputInitialAddress(conf, HOST);
    ConfigHelper.setOutputPartitioner(conf, "RandomPartitioner");

Mapper & Reducer は

public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns,
            OutputCollector<Text, Text> paramOutputCollector,
            Reporter paramReporter) throws IOException {

        DateSerializer sz = new DateSerializer();
        StringSerializer s = new StringSerializer();

        for (IColumn col : columns.values()) {
            Date name = sz.fromByteBuffer(col.name());

            double value = ByteBufferUtil.toDouble(col.value());
            paramOutputCollector.collect(new Text(s.fromByteBuffer(key)),
                    new Text(name.toGMTString() + " [] []  " + value));
        }

    }


public static class ReducerToFilesystem implements
        Reducer<Text, Text, Text, Text> {
    public void reduce(Text key, Iterator<Text> values,
            OutputCollector<Text, Text> paramOutputCollector,
            Reporter paramReporter) throws IOException {
        StringBuffer bfr = new StringBuffer();
        while (values.hasNext()) {
            Text val = values.next();
            bfr.append(val);
            bfr.append("<--->");

        }

        paramOutputCollector.collect(key, new Text(bfr.toString()));

    }

ガイドをお願いします。

助けてくれてありがとう!

4

1 に答える 1