CDH4 クラスターで実行するための MapReduce コードを作成しました。私の要件は、完全なファイルを値として、ファイル名をキーとして読み取ることでした。そのために、カスタムの InputFormat および RecordReader クラスを作成しました。
カスタム入力フォーマット クラス: FullFileInputFormat.java
import java.io.*;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import FullFileRecordReader;
public class FullFileInputFormat extends FileInputFormat<Text, Text> {
@Override
public RecordReader<Text, Text> getRecordReader(InputSplit split, JobConf jobConf, Reporter reporter) throws IOException {
reporter.setStatus(split.toString());
return new FullFileRecordReader((FileSplit) split, jobConf);
}
}
カスタム RecordReader クラス: FullFileRecordReader.java
import java.io.BufferedReader;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
public class FullFileRecordReader implements RecordReader<Text, Text> {
private BufferedReader in;
private boolean processed = false;
private int processedBytes = 0;
private FileSplit fileSplit;
private JobConf conf;
public FullFileRecordReader(FileSplit fileSplit, JobConf conf) {
this.fileSplit = fileSplit;
this.conf = conf;
}
@Override
public void close() throws IOException {
if (in != null) {
in.close();
}
}
@Override
public Text createKey() {
return new Text("");
}
@Override
public Text createValue() {
return new Text("");
}
@Override
public long getPos() throws IOException {
return processedBytes;
}
@Override
public boolean next(Text key, Text value) throws IOException {
Path filePath = fileSplit.getPath();
if (!processed) {
key = new Text(filePath.getName());
value = new Text("");
FileSystem fs = filePath.getFileSystem(conf);
FSDataInputStream fileIn = fs.open(filePath);
byte[] b = new byte[1024];
int numBytes = 0;
while ((numBytes = fileIn.read(b)) > 0) {
value.append(b, 0, numBytes);
processedBytes += numBytes;
}
processed = true;
return true;
}
return false;
}
@Override
public float getProgress() throws IOException {
return 0;
}
}
RecordReader クラスでキー値を出力しようとすると値が取得されますが、マッパー クラスで同じ値を出力すると空白の値が表示されます。Mapper クラスがキーと値のデータを取得できない理由がわかりません。
現在、マップ ジョブのみがあり、リデュース ジョブはありません。コードは次のとおりです。
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import FullFileInputFormat;
public class Source {
public static class Map extends MapReduceBase implements Mapper<Text, Text, Text, Text> {
public void map(Text key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws java.io.IOException {
System.out.println("Processing " + key.toString());
System.out.println("Value: " + value.toString());
}
}
public static void main(String[] args) throws Exception {
JobConf job = new JobConf(Source.class);
job.setJobName("Source");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setJarByClass(Source.class);
job.setInputFormat(FullFileInputFormat.class);
job.setMapperClass(Map.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
JobClient.runJob(job);
}
}