1

Hadoop-1.2.1 を使用しており、ToolRunner を使用して単純な RowCount HBase ジョブを実行しようとしています。ただし、何を試しても、hadoop はマップ クラスを見つけることができません。jar ファイルは hdfs に正しくコピーされていますが、どこが間違っているのかわかりません。助けてください!

コードは次のとおりです。

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;


import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class HBaseRowCountToolRunnerTest extends Configured implements Tool
{

    // What to copy.
    public static final String JAR_NAME = "myJar.jar";
    public static final String LOCAL_JAR = <path_to_jar> + JAR_NAME;
    public static final String REMOTE_JAR = "/tmp/"+JAR_NAME;


    public static void main(String[] args) throws Exception 
    {
        Configuration config = HBaseConfiguration.create();

//All connection configs set here -- omitted to post the code 

        config.set("tmpjars", REMOTE_JAR);


        FileSystem dfs = FileSystem.get(config);

        System.out.println("pathString = " + (new Path(LOCAL_JAR)).toString() + " \n");

        // Copy jar file to remote.
        dfs.copyFromLocalFile(new Path(LOCAL_JAR), new Path(REMOTE_JAR));

        // Get rid of jar file when we're done.
        dfs.deleteOnExit(new Path(REMOTE_JAR));

        // Run the job.
        System.exit(ToolRunner.run(config, new HBaseRowCountToolRunnerTest(), args));
    }

    @Override
    public int run(String[] args) throws Exception 
    {
        Job job = new RowCountJob(getConf(), "testJob", "myLittleHBaseTable");

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static class RowCountJob extends Job
    {

        RowCountJob(Configuration conf, String jobName, String tableName) throws IOException
        {
            super(conf, RowCountJob.class.getCanonicalName() + "_" + jobName);

            setJarByClass(getClass()); 

            Scan scan = new Scan();
            scan.setCacheBlocks(false);
            scan.setFilter(new FirstKeyOnlyFilter());

            setOutputFormatClass(NullOutputFormat.class);

            TableMapReduceUtil.initTableMapperJob(tableName, scan,
                    RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, this);

            setNumReduceTasks(0);

        }

    }//end public static class RowCountJob extends Job

    //Mapper that runs the count
    //TableMapper -- TableMapper<KEYOUT, VALUEOUT> (*OUT by type)
    public static class RowCounterMapper extends TableMapper<ImmutableBytesWritable, Result> 
    {

        //Counter enumeration to count the actual rows
        public static enum Counters {ROWS}

        /**
         * Maps the data.
         *
         * @param row  The current table row key.
         * @param values  The columns.
         * @param context  The current context.
         * @throws IOException When something is broken with the data.
         * @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN,
         *   org.apache.hadoop.mapreduce.Mapper.Context)
         */
        @Override
        public void map(ImmutableBytesWritable row, Result values, Context context) throws IOException 
        {
            // Count every row containing data times 2, whether it's in qualifiers or values
            context.getCounter(Counters.ROWS).increment(2);
        }

    }//end public static class RowCounterMapper extends TableMapper<ImmutableBytesWritable, Result> 


}//end public static void main(String[] args) throws Exception
4

2 に答える 2

1

わかりました-問題の回避策を見つけたので、同様の問題を抱えている他のすべての人に共有すると思いました...

結局のところ、私は tmpjars 構成オプションを放棄し、コード自体から DistributedCache に向けられた jar ファイルをコピーしただけです。これは次のようになります。

// Copy jar file to remote.
FileSystem dfs = FileSystem.get(conf);
dfs.copyFromLocalFile(new Path(LOCAL_JAR), new Path(REMOTE_JAR));

// Get rid of jar file when we're done.
dfs.deleteOnExit(new Path(REMOTE_JAR));

//Place it in the distributed cache
DistributedCache.addFileToClassPath(new Path(REMOTE_JAR), conf, dfs);

おそらく、tmpjars で何が起こっているのかは解決されませんが、機能します。

于 2013-10-01T13:35:30.230 に答える