1

私は Hadoop を初めて使用し、5 ノード クラスターで複数の mapReduce ジョブを実行しています。複数のスレッドを実行すると、「ファイルシステムが閉じられました」という例外が発生し始めました。一度に 1 つずつ実行すると、ジョブは正常に機能します。エラーは、マッピングの直後、削減の直前に発生します。次のようになります。

java.lang.Exception: java.io.IOException: Filesystem closed
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:399)
Caused by: java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:552)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:648)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:706)
at java.io.DataInputStream.read(Unknown Source)
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:167)
at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:526)
at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:143)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:756)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:338)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:231)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)

これは常に発生するわけではなく、失敗したジョブを再実行すると問題なく実行されます。残念ながら、これには時間がかかりすぎます。これは、同じ入力ファイルにアクセスする複数のタスクに関係していると想定しています。1 つのタスクが終了すると、すべてのタスクの入力ファイルが閉じられます。これが問題である場合、これをオーバーライドする方法を知りたいです。マッパー内でクリーンアップをオーバーライドしてパスを再度開くことを試みましたが、これはばかげているようで機能しません。

@Override 
public void cleanup(Context context){
        Job tempJob;
        try {
            tempJob = new Job();
            Path fs = ((FileSplit) context.getInputSplit()).getPath();
            FileInputFormat.addInputPath(tempJob, fs);
            System.out.println("Finished map task for " + context.getJobName());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

これが、スレッドプールを使用して Hadoop mapReduce ジョブを実行する際の根本的な問題であるかどうかも疑問に思っています。アイデアをありがとう。

編集: ジョブとタスクについて言及していたとき、私は少し不明確だったかもしれません. 私は実際に、独自のマッパーとリデューサーを使用して複数のジョブを実行しています。これらの各ジョブは、作成中の特定のテーブルの列を生成します。合計またはカウントを言います。各ジョブには独自のスレッドがあり、それらはすべて同じ入力ファイルにアクセスしています。私が抱えている問題は、一部のジョブが終了すると、「Filesystem closed Exception」がスローされることです。それが違いを生む可能性がある場合は、Yarnも使用しています。

4

1 に答える 1

1

原則として、CPU を集中的に使用するジョブがない限り、同じタスク内で複数のスレッドを使用することはお勧めしません。JVM で問題が発生する可能性が高くなり、タスクを再実行するコストがはるかに高くなります。もちろん、各タスクは個別の JVM で実行されますが、その方がはるかにクリーンです。

本当にマルチスレッドの方法を使用したい場合は、間違ったタイプのマッパーを使用していると思われます.マルチスレッドアプリケーションの場合MultithreadedMapper、メソッドの実装が異なり、runスレッドセーフである必要があります。次のように使用できます。

job.setMapperClass(MultithreadedMapper.class);

次のようにスレッド数を指定できます。

int numThreads = 42;
MultithreadedMapper.setNumberOfThreads(numThreads);
于 2013-03-13T01:51:58.220 に答える