1

IntWritableマッパーからレデューサーにを渡そうとすると、次のエラーが発生します。

INFO mapreduce.Job: Task Id : attempt_1413976354988_0009_r_000000_1, Status : FAILED
Error: java.lang.ClassCastException: org.apache.hadoop.io.IntWritable cannot be cast to org.apache.hadoop.hbase.client.Mutation

これは私のマッパーです:

public class testMapper extends TableMapper<Object, Object>
{

    public void map(ImmutableBytesWritable rowKey, Result columns, Context context) throws IOException, InterruptedException
    {

        try
        {
            // get rowKey and convert it to string
            String inKey = new String(rowKey.get());
            // set new key having only date
            String oKey = inKey.split("#")[0];
            // get sales column in byte format first and then convert it to
            // string (as it is stored as string from hbase shell)
            byte[] bSales = columns.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("sales"));
            String sSales = new String(bSales);
            Integer sales = new Integer(sSales);
            // emit date and sales values
            context.write(new ImmutableBytesWritable(oKey.getBytes()), new IntWritable(sales));

        }

これはリデューサーです:

public class testReducer extends TableReducer<Object, Object, Object>
{

    public void reduce(ImmutableBytesWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
    {
        try
        {

            int sum = 0;
            // loop through different sales vales and add it to sum
            for (IntWritable sales : values)
            {
                Integer intSales = new Integer(sales.toString());
                sum += intSales;
            }

            // create hbase put with rowkey as date

            Put insHBase = new Put(key.get());
            // insert sum value to hbase
            insHBase.add(Bytes.toBytes("cf1"), Bytes.toBytes("sum"), Bytes.toBytes(sum));
            // write data to Hbase table
            context.write(null, insHBase);

そしてドライバー:

public class testDriver
{
    public static void main(String[] args) throws Exception
    {
        Configuration conf = new Configuration();

        // define scan and define column families to scan
        Scan scan = new Scan();
        scan.addFamily(Bytes.toBytes("cf1"));

        Job job = Job.getInstance(conf);
        job.setJarByClass(testDriver.class);

        // define input hbase table
        TableMapReduceUtil.initTableMapperJob("test1", scan, testMapper.class, ImmutableBytesWritable.class, IntWritable.class, job);
        // define output table
        TableMapReduceUtil.initTableReducerJob("test2", testReducer.class, job);

        job.waitForCompletion(true);
    }
}
4

1 に答える 1