2

これが私のコードです:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class SecondarySort extends Configured implements Tool{

public static void main(String[] args) {
    try {
        ToolRunner.run(new Configuration(), new SecondarySort(), args);
    } catch (Exception e) {
        e.printStackTrace();
    }
}

static class KeyPartitioner implements Partitioner<StockKey, DoubleWritable> {

    @Override
    public int getPartition(StockKey arg0, DoubleWritable arg1, int arg2) {

        int partition = arg0.name.hashCode() % arg2;

        return partition;
    }

    @Override
    public void configure(JobConf job) {
    }

}


static class StockKey implements WritableComparable<StockKey> {

    String name;
    Long timestamp;

    public StockKey() {

    }

    StockKey(String name, Long timestamp){
        this.name = name;
        this.timestamp = timestamp;
    }

    @Override
    public void readFields(DataInput arg0) throws IOException {
        name = WritableUtils.readString(arg0);
        timestamp = arg0.readLong();
    }

    @Override
    public void write(DataOutput arg0) throws IOException {
        WritableUtils.writeString(arg0, name);
        arg0.writeLong(timestamp);
    }

    @Override
    public int compareTo(StockKey arg0) {
        int result = 0;

        result = name.compareToIgnoreCase(arg0.name);

        if(result == 0)
            result = timestamp.compareTo(arg0.timestamp);   

        return result;
    }

    public String toString() {
        String outputString = name+","+timestamp;
        return outputString;
    }
}


static class StockReducer implements Reducer<StockKey, DoubleWritable, Text, Text>{

    public void reduce(StockKey key, Iterator<DoubleWritable> value, Outp      
OutputCollector<Text, Text> context, Reporter reporter) 
                throws IOException {

        Text k = new Text(key.toString());

        while(value.hasNext()) {

            Double v = value.next().get();
            Text t = new Text(v.toString());

            context.collect(k, t);          
        }
    }

    @Override
    public void configure(JobConf job) {
        // TODO Auto-generated method stub

    }

    @Override
    public void close() throws IOException {
        // TODO Auto-generated method stub

    }
}


static class StockMapper implements Mapper<LongWritable, Text, StockKey, 
DoubleWritable> {

    public void map(LongWritable offset, Text value, OutputCollector<StockKey, 
DoubleWritable> context, Reporter reporter) 
                throws IOException {

        String[] values = value.toString().split(",");
        StockKey key = new StockKey(values[0].trim(), 
Long.parseLong(values[1].trim()));
        DoubleWritable val = new 
DoubleWritable(Double.parseDouble(values[2].trim()));


            context.collect(key, val);

    }

    @Override
    public void configure(JobConf job) {
        // TODO Auto-generated method stub

    }

    @Override
    public void close() throws IOException {
        // TODO Auto-generated method stub

    }

}

@SuppressWarnings("unchecked")
@Override
public int run(String[] arg) throws Exception {

    JobConf conf = new JobConf(getConf(), SecondarySort.class);
    conf.setJobName(SecondarySort.class.getName());

    conf.setJarByClass(SecondarySort.class);

    conf.setInputFormat(TextInputFormat.class);
    conf.setOutputFormat(TextOutputFormat.class);

    conf.setMapOutputKeyClass(StockKey.class);
    conf.setMapOutputValueClass(Text.class);

    conf.setPartitionerClass((Class<? extends Partitioner<StockKey, 
DoubleWritable>>) KeyPartitioner.class);

    conf.setMapperClass((Class<? extends Mapper<LongWritable, Text, StockKey, 
DoubleWritable>>) StockMapper.class);
    conf.setReducerClass((Class<? extends Reducer<StockKey, DoubleWritable, 
Text, Text>>) StockReducer.class);

    FileInputFormat.addInputPath(conf, new Path(arg[0]));
    FileOutputFormat.setOutputPath(conf, new Path(arg[1]));

    JobClient.runJob(conf);

    return 0;
}

}

例外は次のとおりです。

java.io.IOException: Type mismatch in value from map: expected 
org.apache.hadoop.io.Text, recieved org.apache.hadoop.io.DoubleWritable
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:876)
    at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:499)
    at SecondarySort$StockMapper.map(SecondarySort.java:135)
    at SecondarySort$StockMapper.map(SecondarySort.java:1)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:391)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:270)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at     
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127)
    at org.apache.hadoop.mapred.Child.main(Child.java:264)


12/07/13 03:22:32 INFO mapred.JobClient: Task Id :   
attempt_201207130314_0002_m_000001_2, Status : FAILED

java.io.IOException: Type mismatch in value from map: expected 
org.apache.hadoop.io.Text, recieved org.apache.hadoop.io.DoubleWritable
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:876)
    at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:499)
    at SecondarySort$StockMapper.map(SecondarySort.java:135)
    at SecondarySort$StockMapper.map(SecondarySort.java:1)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:391)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:270)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127)
    at org.apache.hadoop.mapred.Child.main(Child.java:264)
4

1 に答える 1

3

このコードには、それを引き起こす可能性のある非常に多くの潜在的な問題があります。

  • StockKey- デフォルトのhashCode()メソッドをオーバーライドする必要があります - 現時点StockKeyでは、同じ内容の 2 つの hashCode 値が異なります (JVM のデフォルトをオーバーライドしない場合と同様に、すべての範囲と目的である数値が返されます)。 2 つのオブジェクトのメモリ内のアドレス)。あなたのパーティショナーでは、nameフィールドのみを使用することを知っています (これは文字列であり、hashCode() の有効な実装がありますが、これは、将来Stockオブジェクト全体を使用し、 hashCode()2 つの同一の Stock オブジェクトが最終的に異なる減速機

  • KeyPartitionerMath.abs(..)-の結果が必要ですarg0.name.hashCode()。現時点では、この値は負に戻る可能性があります。レデューサーの数でモジュロすると、負の数が返されます。ノックオン効果は、MR フレームワークが 0 (含む) とレデューサーの数 (含まない) の間の数値を想定しているため、例外をスローすることです。次のポイントで説明するように、これはおそらくあなたの問題がある場所です

  • Mapper.mapmethod - を呼び出すときに、潜在的な出力例外を飲み込んでいますcontext.collect。パーティショナーに関する前のポイントから続けて - 負の数を返す場合、例外がスローされるため、対処する必要があります。場合によっては、例外をキャッチして飲み込んでも問題ない場合があります (入力レコードのデータ検証など) が、出力時に発生する例外はすべて MR フレームワークにスローして、何か問題があり、このマッパーの出力が間違っていることを示すフラグを立てる必要があります /不完全な:

    try {
        context.collect(key, val);
    } catch (IOException e) {
        e.printStackTrace();
    }
    
  • 最後に、マップを明示的に宣言し、出力タイプを削減する必要があります (実際にはマッパーが DoubleWritable を出力しているときに、現在マップ値の出力タイプをテキストとして宣言しているため、例外が発生しています)。

job.setMapOutputKeyClass(StockKey.class); job.setMapOutputValueClass(DoubleWritable.class);

job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class);

context.collect 呼び出しの周りの try/catch ブロックを削除して、ジョブを再実行することをお勧めします (または、マップ タスクのログをチェックして、スタック トレースが表示されるかどうかを確認します)。

于 2012-07-12T10:28:32.117 に答える