hbaseのテーブルtest(info:date,info:temp)で平均気温を計算し、結果をテーブルresult(info:date,info:avg)に入れたい。しかし、プログラムを実行するとエラーが発生しました。
コードは次のとおりです。
public static class mapper1 extends TableMapper<Text,FloatWritable>
{
public static final byte[] Info = "info".getBytes();
public static final byte[] Date = "date".getBytes();
public static final byte[] Temp = "temp".getBytes();
private static Text key=new Text();
public void map(ImmutableBytesWritable row,Result value,Context context)
throws IOException
{
String k1 = new String(value.getValue(Info, Date));
key.set(k1);
byte[] val=value.getValue(Info,Temp);
try
{
context.write(key,new
FloatWritable(Float.parseFloat(Bytes.toString(val))));
}
catch(InterruptedException e)
{
throw new IOException(e);
}
}}
// * ** * ** * ** * ** * ** * ** * ** * ** * ** * ** * ** * ** * ** * ** * ** * ** * ** * ** * ** * ** * ** * ** * *
public static class reducer1 extends TableReducer<Text,Result,Text>
{
public static final byte[] info = "info".getBytes();
public static final byte[] date = "date".getBytes();
byte[] avg ;
public void reduce(Text key,Iterable<FloatWritable>values, Context context)
throws IOException, InterruptedException
{
float sum=0;
int count=0;
float average=0;
for(FloatWritable val:values)
{
sum+=val.get();
count++;
}
average=(sum/count);
Put put = new Put(Bytes.toBytes(key.toString()));
put.add(info, date, Bytes.toBytes(average));
System.out.println("For\t"+count+"\t average is:"+average);
context.write(key,put);
}
}
// * ** * ** * ** * ** * ** * ** * ** * ** * ** * ** * ** * ** * ** * ** * ** * ** * ** * ** * ** * ** * ** * ** * **
public static void main(String args[]) throws
IOException,ClassNotFoundException, InterruptedException, NullPointerException
{
Configuration config=HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum", "localhost");
HTable table1 = new HTable(config, "test");
HTable table2 = new HTable(config, "result");
Job job=new Job(config,"AVG");
Scan scan=new Scan();
scan.addFamily("info".getBytes());
scan.setFilter(new FirstKeyOnlyFilter());
TableMapReduceUtil.initTableMapperJob(
"test",
scan,
mapper1.class,
Text.class,
FloatWritable.class,
job);
TableMapReduceUtil.initTableReducerJob(
"result",
reducer1.class,
job);
job.setNumReduceTasks(1);
boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
}
}
}
エラーメッセージは次のとおりです。
Exception in thread "main" java.lang.NullPointerException
at org.apache.hadoop.net.DNS.reverseDns(DNS.java:92)
at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.reverseDNS(TableInputFormatBase.java:223)
at
org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:189)
at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:452)
at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:469)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:366)
at org.apache.hadoop.mapreduce.Job$11.run(Job.java:1218)
at org.apache.hadoop.mapreduce.Job$11.run(Job.java:1215)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:416)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1367)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1215)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1236)
at TempVar.AVG.main(AVG.java:126)
手伝って頂けますか?