0

最初のマッパーでフリーズし、識別可能な出力がない MapReduce ツールがあります。これは単一ノードのインストールであるため、ジョブ トラッカーの Web インターフェイスにアクセスしてデバッグすることができませんでした。入力ファイルのサイズに関係なく、この動作が発生します。私は一日中これをハッキングしており、髪を抜く準備ができています. 出力は次のようになります。

13/09/12 15:12:14 INFO util.NativeCodeLoader: Loaded the native-hadoop library
13/09/12 15:12:14 WA
RN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
13/09/12 15:12:14 INFO input.FileInputFormat: Total input paths to process : 1
13/09/12 15:12:14 INFO mapred.JobClient: Running job: job_local1132137425_0001
13/09/12 15:12:14 INFO mapred.LocalJobRunner: Waiting for map tasks
13/09/12 15:12:14 INFO mapred.LocalJobRunner: Starting task: attempt_local1132137425_0001_m_000000_0
13/09/12 15:12:14 INFO util.ProcessTree: setsid exited with exit code 0
13/09/12 15:12:14 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@339c98d3
13/09/12 15:12:14 INFO mapred.MapTask: Processing split: file:/home/axelmagn/EclipseWorkspace/AxelMagnusonCoursework/assign-2/data/in/input.csv:0+33554432
13/09/12 15:12:14 WARN snappy.LoadSnappy: Snappy native library not loaded
13/09/12 15:12:14 INFO mapred.MapTask: io.sort.mb = 100
13/09/12 15:12:14 INFO mapred.MapTask: data buffer = 79691776/99614720
13/09/12 15:12:14 INFO mapred.MapTask: record buffer = 262144/327680
13/09/12 15:12:15 INFO mapred.JobClient:  map 0% reduce 0%
13/09/12 15:12:15 INFO mapred.MapTask: Starting flush of map output
13/09/12 15:12:15 INFO mapred.MapTask: Starting flush of map output
13/09/12 15:12:20 INFO mapred.LocalJobRunner: 
13/09/12 15:12:21 INFO mapred.JobClient:  map 20% reduce 0%

そして、それは無期限にハングします。

ツール ルーチン (要約):

        tempPath = new Path("/tmp/" + outDirPath.getName() + "_1_" + now);
        tempPath2 = new Path("/tmp/" + outDirPath.getName() + "_2_" + now);
        job1 = new VisitorCountJob(inFilePath, tempPath);
        success = job1.waitForCompletion(true);
        if (!success)
            throw new Exception("Visitor Count Job Failed.");
        job2 = new TopVisitorJob(tempPath, outDirPath, TOPN);
        success = job2.waitForCompletion(true);
        return success ? 0 : 1;

仕事:

public class VisitorCountJob extends Job {

    public static final String TAB = "\t";

    public VisitorCountJob(Path inputPath, Path outputPath)
            throws IOException {
        super();
        this.setJarByClass(VisitorCountJob.class);
        this.setJobName("Visitor Count");

        this.setInputFormatClass(VisitInputFormat.class);

        VisitInputFormat.setInputPaths(this, inputPath);
        FileOutputFormat.setOutputPath(this, outputPath);

        this.setMapperClass(VisitorCountMapper.class);
        this.setReducerClass(VisitorCountReducer.class);

        this.setOutputKeyClass(Person.class);
        this.setOutputValueClass(IntWritable.class);

        this.setOutputFormatClass(SequenceFileOutputFormat.class);
    }

}

マッパー:

public class VisitorCountMapper extends
        Mapper<LongWritable, Visit, Person, IntWritable> {

    @Override
    public void map(LongWritable key, Visit value, Context context)
            throws IOException, InterruptedException {

        try {
            Person visitor = value.getVisitor();
            context.write(visitor, new IntWritable(1));
        } catch (IOException e) {
            e.printStackTrace();
            throw e;
        } catch (InterruptedException e) {
            e.printStackTrace();
            throw e;
        }
    }
}

レデューサー:

public class VisitorCountReducer extends
        Reducer<Person, IntWritable, Person, IntWritable> {

    @Override
    public void reduce(Person visitor, Iterable<IntWritable> values,
            Context context) throws IOException, InterruptedException {

        int count = 0;
        for (IntWritable value : values) {
            count += value.get();
        }
        context.write(visitor, new IntWritable(count));
    }

}

生のテキストから Visit オブジェクトを生成する InputFormat と RecordReader も作成しましたが、関連性があると思われる人がいない限り、簡潔にするために省略します。

私は本当に機知に富んでいるので、どんな助けでも大歓迎です。

編集: 関心が示されたため、ここに私のデータ型の実装の一部を示します。

人:

public class Person implements WritableComparable<Person>  {

    public Text firstName;
    public Text lastName;

    public Person() {}

    public Person(Text firstName, Text lastName) {
        this.firstName = firstName;
        this.lastName = lastName;
    }

    public Person(String firstName, String lastName) {
        this(new Text(firstName), new Text(lastName));
    }

    public void readFields(DataInput in) throws IOException {
        firstName.readFields(in);
        lastName.readFields(in);

    }

    public void write(DataOutput out) throws IOException {
        firstName.write(out);
        lastName.write(out);
    }

    public int compareTo(Person other) {

        int out;

        // give sorting preference to first name
        out = firstName.compareTo(other.firstName);
        if(out != 0)
            return out;
        return lastName.compareTo(other.lastName);
    }

}

VisitInputFormat:

public class VisitInputFormat extends FileInputFormat<LongWritable, Visit> {

    public RecordReader<LongWritable, Visit> createRecordReader(
            InputSplit split, TaskAttemptContext context) 
            throws IOException, InterruptedException {

        VisitRecordReader reader = new VisitRecordReader();
        reader.initialize(split, context);
        return reader;
    }
}

VisitRecordReader:

public class VisitRecordReader extends RecordReader<LongWritable, Visit> {
    private LineRecordReader lineReader;
    private LongWritable lineKey;
    private Text lineValue;

    public VisitRecordReader() {
        lineReader = new LineRecordReader();
    }

    public void initialize(InputSplit genericSplit, TaskAttemptContext context)
            throws IOException {
        lineReader.initialize(genericSplit, context);
    }

    public boolean nextKeyValue() throws IOException {
        return lineReader.nextKeyValue();
    }

    public LongWritable getCurrentKey() {
        return lineReader.getCurrentKey();
    }

    public Visit getCurrentValue() {
        String raw = lineReader.getCurrentValue().toString();
        return new Visit(raw);
    }

    public float getProgress() throws IOException {
        return lineReader.getProgress();
    }

    public void close() throws IOException {
        lineReader.close();
    }

}

訪問:

public class VisitRecordReader extends RecordReader<LongWritable, Visit> {
    private LineRecordReader lineReader;
    private LongWritable lineKey;
    private Text lineValue;

    public VisitRecordReader() {
        lineReader = new LineRecordReader();
    }

    public void initialize(InputSplit genericSplit, TaskAttemptContext context)
            throws IOException {
        lineReader.initialize(genericSplit, context);
    }

    public boolean nextKeyValue() throws IOException {
        return lineReader.nextKeyValue();
    }

    public LongWritable getCurrentKey() {
        return lineReader.getCurrentKey();
    }

    public Visit getCurrentValue() {
        String raw = lineReader.getCurrentValue().toString();
        return new Visit(raw);
    }

    public float getProgress() throws IOException {
        return lineReader.getProgress();
    }

    public void close() throws IOException {
        lineReader.close();
    }

}
4

0 に答える 0