map 関数で csv ファイルを生成しています。各マップ タスクが 1 つの csv ファイルを生成するようにします。これは副作用であり、マッパーの出力ではありません。これらのファイルに名前を付ける方法は、filename_inputkey のようなものです。ただし、単一ノード クラスタでアプリケーションを実行すると、生成されるファイルは 1 つだけです。入力には 10 行あり、私の理解では、10 個のマッパー タスクがあり、10 個のファイルが生成されます。ここで私が間違った方法で考えているかどうか教えてください。
これが私のGWASInputFormatクラスです
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
public class GWASInputFormat extends FileInputFormat<LongWritable, GWASGenotypeBean>{
@Override
public RecordReader<LongWritable, GWASGenotypeBean> getRecordReader(org.apache.hadoop.mapred.InputSplit input, JobConf job, Reporter arg2) throws IOException {
return (RecordReader<LongWritable, GWASGenotypeBean>) new GWASRecordReader(job, (FileSplit)input);
}
}
ここにGWASRecordReaderがあります
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
public class GWASRecordReader implements RecordReader<LongWritable, GWASGenotypeBean>{
private LineRecordReader lineReader;
private LongWritable lineKey;
private Text lineValue;
@Override
public void close() throws IOException {
if(lineReader != null) {
lineReader.close();
}
}
public GWASRecordReader(JobConf job, FileSplit split) throws IOException {
lineReader = new LineRecordReader(job, split);
lineKey = lineReader.createKey();
lineValue = lineReader.createValue();
}
@Override
public LongWritable createKey() {
return new LongWritable();
}
@Override
public GWASGenotypeBean createValue() {
return new GWASGenotypeBean();
}
@Override
public long getPos() throws IOException {
return lineReader.getPos();
}
@Override
public boolean next(LongWritable key, GWASGenotypeBean value) throws IOException {
if(!lineReader.next(lineKey, lineValue)){
return false;
}
String[] values = lineValue.toString().split(",");
if(values.length !=32) {
throw new IOException("Invalid Record ");
}
value.setPROJECT_NAME(values[0]);
value.setRESEARCH_CODE(values[1]);
value.setFACILITY_CODE(values[2]);
value.setPROJECT_CODE(values[3]);
value.setINVESTIGATOR(values[4]);
value.setPATIENT_NUMBER(values[5]);
value.setSAMPLE_COLLECTION_DATE(values[6]);
value.setGENE_NAME(values[7]);
value.setDbSNP_RefSNP_ID(values[8]);
value.setSNP_ID(values[9]);
value.setALT_SNP_ID(values[10]);
value.setSTRAND(values[11]);
value.setASSAY_PLATFORM(values[12]);
value.setSOFTWARE_NAME(values[13]);
value.setSOFTWARE_VERSION_NUMBER(values[14]);
value.setTEST_DATE(values[15]);
value.setPLATE_POSITION(values[16]);
value.setPLATE_ID(values[17]);
value.setOPERATOR(values[18]);
value.setGENOTYPE(values[19]);
value.setGENOTYPE_QS1_NAME(values[20]);
value.setGENOTYPE_QS2_NAME(values[21]);
value.setGENOTYPE_QS3_NAME(values[22]);
value.setGENOTYPE_QS4_NAME(values[23]);
value.setGENOTYPE_QS5_NAME(values[24]);
value.setGENOTYPE_QS1_RESULT(values[25]);
value.setGENOTYPE_QS2_RESULT(values[26]);
value.setGENOTYPE_QS3_RESULT(values[27]);
value.setGENOTYPE_QS4_RESULT(values[28]);
value.setGENOTYPE_QS5_RESULT(values[29]);
value.setSTAGE(values[30]);
value.setLAB(values[31]);
return true;
}
@Override
public float getProgress() throws IOException {
return lineReader.getProgress();
}
}
マッパークラス
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import com.google.common.base.Strings;
public class GWASMapper extends MapReduceBase implements Mapper<LongWritable, GWASGenotypeBean, Text, Text> {
private static Configuration conf;
@SuppressWarnings("rawtypes")
public void setup(org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException {
conf = context.getConfiguration();
// Path[] otherFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());
}
@Override
public void map(LongWritable inputKey, GWASGenotypeBean inputValue, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
checkForNulls(inputValue, inputKey.toString());
output.collect(new Text(inputValue.getPROJECT_CODE()), new Text(inputValue.getFACILITY_CODE()));
}
private void checkForNulls(GWASGenotypeBean user, String inputKey) {
String f1 = " does not have a value_fail";
String p1 = "Must not contain NULLS for required fields";
// have to initialize these two to some paths in hdfs
String edtChkRptDtl = "/user/hduser/output6/detail" + inputKey + ".csv";
String edtChkRptSmry = "/user/hduser/output6/summary" + inputKey + ".csv";
../
List<String> errSmry = new ArrayList<String>();
Map<String, String> loc = new TreeMap<String, String>();
if(Strings.isNullOrEmpty(user.getPROJECT_NAME())) {
loc.put("test", "PROJECT_NAME ");
errSmry.add("_fail");
} else if(Strings.isNullOrEmpty(user.getRESEARCH_CODE())) {
loc.put("test", "RESEARCH_CODE ");
errSmry.add("_fail");
} else if(Strings.isNullOrEmpty(user.getFACILITY_CODE())) {
loc.put("test", "FACILITY_CODE ");
errSmry.add("_fail");
} else if(Strings.isNullOrEmpty(user.getPROJECT_CODE())) {
loc.put("test", "PROJECT_CODE ");
errSmry.add("_fail");
} else if(Strings.isNullOrEmpty(user.getINVESTIGATOR())) {
loc.put("test", "INVESTIGATOR ");
errSmry.add("_fail");
} else if(Strings.isNullOrEmpty(user.getPATIENT_NUMBER())) {
loc.put("test", "PATIENT_NUMBER ");
errSmry.add("_fail");
} else if(Strings.isNullOrEmpty(user.getSAMPLE_COLLECTION_DATE())) {
loc.put("test", "SAMPLE_COLLECTION_DATE ");
errSmry.add("_fail");
} else if(Strings.isNullOrEmpty(user.getGENE_NAME())) {
loc.put("test", "GENE_NAME ");
errSmry.add("_fail");
} else if(Strings.isNullOrEmpty(user.getSTRAND())) {
loc.put("test", "STRAND ");
errSmry.add("_fail");
} else if(Strings.isNullOrEmpty(user.getASSAY_PLATFORM())) {
loc.put("test", "ASSAY_PLATFORM ");
errSmry.add("_fail");
} else if(Strings.isNullOrEmpty(user.getSOFTWARE_NAME())) {
loc.put("test", "SOFTWARE_NAME ");
errSmry.add("_fail");
} else if(Strings.isNullOrEmpty(user.getTEST_DATE())) {
loc.put("test", "TEST_DATE ");
errSmry.add("_fail");
} else if(Strings.isNullOrEmpty(user.getPLATE_POSITION())) {
loc.put("test", "PLATE_POSITION ");
errSmry.add("_fail");
} else if(Strings.isNullOrEmpty(user.getPLATE_ID())) {
loc.put("test", "PLATE_ID ");
errSmry.add("_fail");
} else if(Strings.isNullOrEmpty(user.getOPERATOR())) {
loc.put("test", "OPERATOR ");
errSmry.add("_fail");
} else if(Strings.isNullOrEmpty(user.getGENOTYPE())) {
loc.put("test", "GENOTYPE ");
errSmry.add("_fail");
} else if(Strings.isNullOrEmpty(user.getSTAGE())) {
loc.put("test", "STAGE ");
errSmry.add("_fail");
} else if(Strings.isNullOrEmpty(user.getLAB())) {
loc.put("test", "LAB ");
errSmry.add("_fail");
}
String customNullMsg = "Required Genotype column(s)";
List<String> error = new ArrayList<String>();
String message = null;
if(!loc.isEmpty()) {
for (Map.Entry<String, String> entry : loc.entrySet()) {
message = "line:" + entry.getKey() + " column:" + entry.getValue() + " " + f1;
error.add(message);
}
} else {
message = "_pass";
error.add(message);
}
int cnt = 0;
if(!errSmry.isEmpty()) {
// not able to understand this. Are we trying to get the occurances
// if the last key that contains _fail
for (String key : errSmry) {
if(key.contains("_fail")) {
cnt = Collections.frequency(errSmry, key);
// ******************** Nikhil added this
break;
}
}
if(cnt > 0) {
writeCsvFileSmry(edtChkRptSmry, customNullMsg, p1, "failed", Integer.toString(cnt));
} else {
writeCsvFileSmry(edtChkRptSmry, customNullMsg, p1, "passed", "0");
}
} else {
writeCsvFileSmry(edtChkRptSmry, customNullMsg, p1, "passed", "0");
}
// loop the list and write out items to the error report file
if(!error.isEmpty()) {
for (String s : error) {
//System.out.println(s);
if(s.contains("_fail")) {
String updatedFailmsg = s.replace("_fail", "");
writeCsvFileDtl(edtChkRptDtl, "genotype", updatedFailmsg, "failed");
}
if(s.contains("_pass")) {
writeCsvFileDtl(edtChkRptDtl, "genotype", p1, "passed");
}
}
} else {
writeCsvFileDtl(edtChkRptDtl, "genotype", p1, "passed");
}
// end loop
}
private void writeCsvFileDtl(String edtChkRptDtl, String col1, String col2, String col3) {
try {
if(conf == null) {
conf = new Configuration();
}
FileSystem fs = FileSystem.get(conf);
Path path = new Path(edtChkRptDtl);
if (!fs.exists(path)) {
FSDataOutputStream out = fs.create(path);
out.writeChars(col1);
out.writeChar(',');
out.writeChars(col2);
out.writeChar(',');
out.writeChars(col3);
out.writeChar('\n');
out.flush();
out.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
private void writeCsvFileSmry(String edtChkRptSmry, String col1, String col2, String col3, String col4) {
try {
if(conf == null) {
conf = new Configuration();
}
FileSystem fs = FileSystem.get(conf);
Path path = new Path(edtChkRptSmry);
if (!fs.exists(path)) {
FSDataOutputStream out = fs.create(path);
out.writeChars(col1);
out.writeChar(',');
out.writeChars(col2);
out.writeChar(',');
out.writeChars(col3);
out.writeChar(',');
out.writeChars(col4);
out.writeChar('\n');
out.flush();
out.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
これが私のドライバークラスです
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class GWASMapReduce extends Configured implements Tool{
/**
* @param args
*/
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
ToolRunner.run(configuration, new GWASMapReduce(), args);
}
@Override
public int run(String[] arg0) throws Exception {
JobConf conf = new JobConf(new Configuration());
conf.setInputFormat(GWASInputFormat.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setJarByClass(GWASMapReduce.class);
conf.setMapperClass(GWASMapper.class);
conf.setNumReduceTasks(0);
FileInputFormat.addInputPath(conf, new Path(arg0[0]));
FileOutputFormat.setOutputPath(conf, new Path(arg0[1]));
JobClient.runJob(conf);
return 0;
}
}