現在、2 番目のジョブが最初のジョブの出力を分散キャッシュに追加する必要がある 2 つの Hadoop ジョブがあります。現在、手動で実行しているため、最初のジョブが終了したら、出力ファイルを引数として 2 番目のジョブに渡し、そのドライバーがそれをキャッシュに追加します。
最初のジョブは単純なマップのみのジョブであり、両方のジョブを順番に実行したときに 1 つのコマンドを実行できることを望んでいました。
最初のジョブの出力を分散キャッシュに入れて、2 番目のジョブに渡すことができるようにするためのコードを手伝ってくれる人はいますか?
ありがとう
編集:これはジョブ1の現在のドライバーです:
public class PlaceDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: PlaceMapper <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "Place Mapper");
job.setJarByClass(PlaceDriver.class);
job.setMapperClass(PlaceMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
TextInputFormat.addInputPath(job, new Path(otherArgs[0]));
TextOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
これは job2 のドライバーです。ジョブ 1 の出力は、最初の引数としてジョブ 2 に渡され、キャッシュにロードされます。
public class LocalityDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 3) {
System.err.println("Usage: LocalityDriver <cache> <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "Job Name Here");
DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(),job.getConfiguration());
job.setNumReduceTasks(1); //TODO: Will change
job.setJarByClass(LocalityDriver.class);
job.setMapperClass(LocalityMapper.class);
job.setCombinerClass(TopReducer.class);
job.setReducerClass(TopReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
TextInputFormat.addInputPath(job, new Path(otherArgs[1]));
TextOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}