1

ファイルのグループで構成される Map タスクの分割を作成するために、カスタムの結合ファイル入力形式を実装しました。スプリットの各ファイルをレコードリーダーに渡すソリューションを作成しましたが、すべて問題ありません。今、マップ関数にファイルのセット全体を渡そうとしています。

これは私のレコードリーダーコードです:

public class MultiImagesRecordReader extends
        RecordReader<Text[], BytesWritable[]> {
private long start = 0;
private long end = 0;
private int pos = 0;
private BytesWritable[] value;
private Text key[];
private CombineFileSplit split;
private Configuration conf;
private FileSystem fs;
private static boolean recordsRead;

public MultiImagesRecordReader(CombineFileSplit split,
        TaskAttemptContext context, Integer index) throws IOException {
    this.split = split;
    this.conf = context.getConfiguration();
}

@Override
public void initialize(InputSplit genericSplit, TaskAttemptContext context)
        throws IOException, InterruptedException {
    start = split.getOffset(0);
    end = start + split.getLength();
    recordsRead = false;
    this.pos = (int) start;
    fs = FileSystem.get(conf);
    value = new BytesWritable[split.getNumPaths()];
    key = new Text[split.getNumPaths()];
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
    if (recordsRead == true) {
        System.out.println("Sono nel next true"+InetAddress.getLocalHost());
        return false;
    } else {
        recordsRead = true;
        System.out.println("Sono nel next false"+InetAddress.getLocalHost());
        for (int i = 0; i < split.getNumPaths(); i++) {

            int fileLength = (int) split.getLength(i);
            Path path = split.getPath(i);
            byte[] result = new byte[fileLength];

            FSDataInputStream in = null;

            String file_path = path.toString();
            key[i] = new Text(file_path);
            try {
                in = fs.open(path);
                IOUtils.readFully(in, result, 0, fileLength);

            } finally {
                IOUtils.closeStream(in);
            }

            value[i] = new BytesWritable(result);
        }
        return true;
    }
}

このコードでは、マップ関数がキーと値のベクトルを正しく受け取りますが、繰り返し発生します。つまり、 map 関数は 1 回呼び出されると思っていましたが、代わりに複数回呼び出されました。私は何を間違っていますか?

4

1 に答える 1

0

指定されたすべてのキーと値のペアが終了するまで、Reader が から返す各レコードに対してmap()ofが呼び出されることを知っていると思います。map 関数が同じキー値のペアに対して繰り返し呼び出されることを理解しています (これは、単一のキー値のペアに対して 1 回呼び出されることになっています)。これは、レコード リーダーが同じレコード (キーと値のペア) を繰り返し読み取ることを意味します。また、カスタム結合ファイル入力形式とレコード リーダーも実装しました。同じプロジェクト内で、それらの一般的な形式をここで、実装をここで見ることができます。MappercurrentKey()currentValue()Split

于 2014-02-19T09:48:08.167 に答える