MapperOne、ReducerOne、MapperTwo、ReducerTwo という 4 つのクラスがあります。これらの間にチェーンが必要です。MapperOne-->ReducerOne-->出力ファイル MapperTwo-->MapperTwo-->ReducerTwo-->Final Output File に入力される生成。
私のドライバークラスコード:
public class StockDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
System.out.println(" Driver invoked------");
Configuration config = new Configuration();
config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", " ");
config.set("mapred.textoutputformat.separator", " --> ");
String inputPath="In\\NYSE_daily_prices_Q_less.csv";
String outpath = "C:\\Users\\Outputs\\run1";
String outpath2 = "C:\\UsersOutputs\\run2";
Job job1 = new Job(config,"Stock Analysis: Creating key values");
job1.setInputFormatClass(TextInputFormat.class);
job1.setOutputFormatClass(TextOutputFormat.class);
job1.setMapOutputKeyClass(Text.class);
job1.setMapOutputValueClass(StockDetailsTuple.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(Text.class);
job1.setMapperClass(StockMapperOne.class);
job1.setReducerClass(StockReducerOne.class);
FileInputFormat.setInputPaths(job1, new Path(inputPath));
FileOutputFormat.setOutputPath(job1, new Path(outpath));
//THE SECOND MAP_REDUCE TO DO CALCULATIONS
Job job2 = new Job(config,"Stock Analysis: Calculating Covariance");
job2.setInputFormatClass(TextInputFormat.class);
job2.setOutputFormatClass(TextOutputFormat.class);
job2.setMapOutputKeyClass(LongWritable.class);
job2.setMapOutputValueClass(Text.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);
job2.setMapperClass(StockMapperTwo.class);
job2.setReducerClass(StockReducerTwo.class);
String outpath3=outpath+"\\part-r-00000";
System.out.println("OUT PATH 3: " +outpath3 );
FileInputFormat.setInputPaths(job2, new Path(outpath3));
FileOutputFormat.setOutputPath(job2, new Path(outpath2));
if(job1.waitForCompletion(true)){
System.out.println(job2.waitForCompletion(true));
}
}
}
MapperOne と ReducerOne が適切に実行され、出力ファイルが適切なパスに保存されます。2 番目のジョブが実行されると、レデューサーのみが呼び出されます。以下は私の MapperTwo と ReducerTwo のコードです。
マッパー2
public class StockMapperTwo extends Mapper<Text, Text, LongWritable, Text> {
public void map(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException{
System.out.println("------ MAPPER 2 CALLED-----");
for(Text val: values){
System.out.println("KEY: "+ key.toString() + " VALUE: "+ val.toString());
//context.write(new Text("mapper2"), new Text("hi"));
context.write(new LongWritable(2), new Text("hi"));
}
}
}
レデューサー2
public class StockReducerTwo extends Reducer<LongWritable, Text, Text, Text>{
public void reduce(LongWritable key, Iterable<Text>values, Context context) throws IOException, InterruptedException{
System.out.println(" REDUCER 2 INVOKED");
context.write(new Text("hello"), new Text("hi"));
}
}
この構成に対する私の疑問は
job2.setMapperClass(StockMapperTwo.class) に設定されているにもかかわらず、マッパーがスキップされる理由
設定しないと
job2.setMapOutputKeyClass(LongWritable.class); job2.setMapOutputValueClass(Text.class);
、レデューサーも呼び出されません。そして、このエラーが来ています。
java.io.IOException: マップからのキーの型の不一致: 予想される org.apache.hadoop.io.Text、org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask で org.apache.hadoop.io.LongWritable を受け取りました.java:870) org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:573) org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80) org.apache. hadoop.mapreduce.Mapper.map(Mapper.java:124) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
これはどのように起こっていますか?マッパーとリデューサーを適切に呼び出すことができません。