ファイルのグループで構成される Map タスクの分割を作成するために、カスタムの結合ファイル入力形式を実装しました。スプリットの各ファイルをレコードリーダーに渡すソリューションを作成しましたが、すべて問題ありません。今、マップ関数にファイルのセット全体を渡そうとしています。
これは私のレコードリーダーコードです:
public class MultiImagesRecordReader extends
RecordReader<Text[], BytesWritable[]> {
private long start = 0;
private long end = 0;
private int pos = 0;
private BytesWritable[] value;
private Text key[];
private CombineFileSplit split;
private Configuration conf;
private FileSystem fs;
private static boolean recordsRead;
public MultiImagesRecordReader(CombineFileSplit split,
TaskAttemptContext context, Integer index) throws IOException {
this.split = split;
this.conf = context.getConfiguration();
}
@Override
public void initialize(InputSplit genericSplit, TaskAttemptContext context)
throws IOException, InterruptedException {
start = split.getOffset(0);
end = start + split.getLength();
recordsRead = false;
this.pos = (int) start;
fs = FileSystem.get(conf);
value = new BytesWritable[split.getNumPaths()];
key = new Text[split.getNumPaths()];
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (recordsRead == true) {
System.out.println("Sono nel next true"+InetAddress.getLocalHost());
return false;
} else {
recordsRead = true;
System.out.println("Sono nel next false"+InetAddress.getLocalHost());
for (int i = 0; i < split.getNumPaths(); i++) {
int fileLength = (int) split.getLength(i);
Path path = split.getPath(i);
byte[] result = new byte[fileLength];
FSDataInputStream in = null;
String file_path = path.toString();
key[i] = new Text(file_path);
try {
in = fs.open(path);
IOUtils.readFully(in, result, 0, fileLength);
} finally {
IOUtils.closeStream(in);
}
value[i] = new BytesWritable(result);
}
return true;
}
}
このコードでは、マップ関数がキーと値のベクトルを正しく受け取りますが、繰り返し発生します。つまり、 map 関数は 1 回呼び出されると思っていましたが、代わりに複数回呼び出されました。私は何を間違っていますか?