1

多くのファイルがあり、そのうちのいくつかは非常に小さいです。マッパーの数を減らすために、CombineFileInputFormat を使用したいと思います。ファイル名は、マッパー出力のキーの一部として使用されます。

CombineFileSplit の各チャンクのファイル名を取得するために、次のようないくつかの方法を試しましたが、すべて失敗しました。

1)conf.set("map.input.file", split.getPath(idx).toString());関数で見る

initNextRecordReader()クラスのCombineFileRecordReader。しかしNullPointerException

map()として、私の機能で起こりますcontext.getConfiguration().get("map.input.file")

戻りますnull

2)((FileSplit) (context.getInputSplit())).getPath().getName()マッパーでも試してみましたが、java.lang.ClassCastException: org.apache.hadoop.mapreduce.lib.input.CombineFileSplit cannot be cast to org.apache.hadoop.mapreduce.lib.input.FileSplit起こります。

では、 CombineFileSplit で各ファイル名を取得するにはどうすればよいですか?

================================================== ==========

入力ファイルは lzo 圧縮されており、現時点ではインデックス化されていません。

以下は私のコードです:

私は次のように CombineFileInputFormat を実装します。

public class CombinedInputFormat extends CombineFileInputFormat<LongWritable, Text> {

    @Override
    public RecordReader<LongWritable, Text> createRecordReader(InputSplit arg0,
            TaskAttemptContext arg1) throws IOException {
        // TODO Auto-generated method stub
        return new CombineFileRecordReader<LongWritable, Text>((CombineFileSplit) arg0, arg1, CombineLzoLineRecordReader.class);
    }

}

これは、LzoLineRecordReader を拡張した CombineLzoLineRecordReader です。

public class CombineLzoLineRecordReader extends LzoLineRecordReader {
    private int index;

    public CombineLzoLineRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index)
        throws IOException, InterruptedException {
        this.index = index;
    }

    public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException {
        CombineFileSplit combineSplit = (CombineFileSplit) genericSplit;
        FileSplit fileSplit = new FileSplit(combineSplit.getPath(index), combineSplit.getOffset(index), combineSplit.getLength(index), combineSplit.getLocations());
        super.initialize(fileSplit, context);
    }
}

そして、私のマップメソッドは次のようになります:

private String getName(String filePath) {
        String[] filePathDir = filePath.split("/");
        return filePathDir[filePathDir.length - 1];
    }

public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {

        String name = getName(context.getConfiguration().get("map.input.file"));

        line = new String(value.getBytes(), 0, value.getLength(), "ISO-8859-1");
        lineFields = line.split("\t",-1);
        if (lineFields != null && lineFields.length >= 20) { 
                    // do something ...
        }
    }

そしてエラー情報:

13/06/14 17:02:50 INFO mapred.JobClient: Task Id : attempt_201209101415_762760_m_000000_0, Status : FAILED
java.lang.NullPointerException
        at com.netease.devilfish.hadoop.job.LogAnalysisDailyMapper.getName(Unknown Source)
4

1 に答える 1

4

CombineFileRecordReader は、マッパー コンテキストではなく、タスク コンテキストで入力パスを設定します。これらは 2 つの異なるコンテキスト オブジェクトであるため、最初のエラーです。私はこの同じ問題に直面しており、これが私がそれを解決した方法です

CombineLzoLineRecordReader とマッパー クラスの両方が同じ jvm で実行されるため、静的変数を介してデータを共有できます。CombineLzoLineRecordReader クラスを次のように変更します (アスタリスクの変更)。

public class CombineLzoLineRecordReader extends LzoLineRecordReader {
    private int index;
    **private static String currentPath**;

    public CombineLzoLineRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index)
        throws IOException, InterruptedException {
        this.index = index;
    }

    public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException {
        CombineFileSplit combineSplit = (CombineFileSplit) genericSplit;
        FileSplit fileSplit = new FileSplit(combineSplit.getPath(index), combineSplit.getOffset(index), combineSplit.getLength(index), combineSplit.getLocations());
        **currentPath = fileSplit.getPath().toString();**
        super.initialize(fileSplit, context);
    }

    public static String getCurrentFilePath() {
        return currentFilePath;
    }
}

ファイル パスを取得するには、マッパー コードで CombineLzoLineRecordReader.getCurrentFilePath() を使用します。

于 2013-09-23T10:00:01.523 に答える