私は Hadoop を初めて使用し、5 ノード クラスターで複数の mapReduce ジョブを実行しています。複数のスレッドを実行すると、「ファイルシステムが閉じられました」という例外が発生し始めました。一度に 1 つずつ実行すると、ジョブは正常に機能します。エラーは、マッピングの直後、削減の直前に発生します。次のようになります。
java.lang.Exception: java.io.IOException: Filesystem closed
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:399)
Caused by: java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:552)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:648)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:706)
at java.io.DataInputStream.read(Unknown Source)
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:167)
at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:526)
at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:143)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:756)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:338)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:231)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
これは常に発生するわけではなく、失敗したジョブを再実行すると問題なく実行されます。残念ながら、これには時間がかかりすぎます。これは、同じ入力ファイルにアクセスする複数のタスクに関係していると想定しています。1 つのタスクが終了すると、すべてのタスクの入力ファイルが閉じられます。これが問題である場合、これをオーバーライドする方法を知りたいです。マッパー内でクリーンアップをオーバーライドしてパスを再度開くことを試みましたが、これはばかげているようで機能しません。
@Override
public void cleanup(Context context){
Job tempJob;
try {
tempJob = new Job();
Path fs = ((FileSplit) context.getInputSplit()).getPath();
FileInputFormat.addInputPath(tempJob, fs);
System.out.println("Finished map task for " + context.getJobName());
} catch (IOException e) {
e.printStackTrace();
}
}
これが、スレッドプールを使用して Hadoop mapReduce ジョブを実行する際の根本的な問題であるかどうかも疑問に思っています。アイデアをありがとう。
編集: ジョブとタスクについて言及していたとき、私は少し不明確だったかもしれません. 私は実際に、独自のマッパーとリデューサーを使用して複数のジョブを実行しています。これらの各ジョブは、作成中の特定のテーブルの列を生成します。合計またはカウントを言います。各ジョブには独自のスレッドがあり、それらはすべて同じ入力ファイルにアクセスしています。私が抱えている問題は、一部のジョブが終了すると、「Filesystem closed Exception」がスローされることです。それが違いを生む可能性がある場合は、Yarnも使用しています。