多くの入力ファイルに対して Hadoop ジョブを実行しています。ただし、ファイルの 1 つが破損していると、ジョブ全体が失敗します。
破損したファイルを無視するようにジョブを作成するにはどうすればよいですか? カウンター/エラーログを書いてくれるかもしれませんが、ジョブ全体が失敗することはありません
ジョブがどこで失敗しているかによって異なります。行が破損していて、マップ メソッドのどこかで例外がスローされた場合は、マップ メソッドの本体を try / catch でラップし、エラーをログに記録するだけで済みます。
protected void map(LongWritable key, Text value, Context context) {
try {
// parse value to a long
int val = Integer.parseInt(value.toString());
// do something with key and val..
} catch (NumberFormatException nfe) {
// log error and continue
}
}
ただし、InputFormat の RecordReader によってエラーがスローされた場合は、マッパーrun(..)
メソッドを修正する必要があります。デフォルトの実装は次のとおりです。
public void run(Context context) {
setup(context);
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
cleanup(context);
}
したがって、これを修正して呼び出しで例外をキャッチすることもできcontext.nextKeyValue()
ますが、リーダーによってスローされたエラーを無視することに注意する必要があります。
独自の InputFormat / RecordReader を作成していて、レコードの失敗を示す特定の例外があるが、スキップして解析を続行できる場合は、おそらく次のようなものが機能します。
public void run(Context context) {
setup(context);
while (true) {
try {
if (!context.nextKeyValue()) {
break;
} else {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} catch (SkippableRecordException sre) {
// log error
}
}
cleanup(context);
}
ただし、繰り返しますが、RecordReader はエラー時に回復できる必要があります。そうしないと、上記のコードによって無限ループに陥る可能性があります。
特定のケースでは、最初の失敗時にファイルを無視したい場合は、 run メソッドをもっと簡単なものに更新できます。
public void run(Context context) {
setup(context);
try {
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
cleanup(context);
} catch (Exception e) {
// log error
}
}
警告の最後の言葉:
これは、カスケードで失敗トラップが使用されるものです。
操作が失敗して例外がスローされるたびに、関連するトラップがあれば、問題のタプルはトラップ Tap で指定されたリソースに保存されます。これにより、ジョブはデータを失うことなく処理を続行できます。
これにより、基本的にジョブを続行でき、後で破損したファイルをチェックできます
フロー定義ステートメントでのカスケードにある程度慣れている場合:
new FlowDef().addTrap( String branchName, Tap trap );
別の方法も考えられます。mapred.max.map.failures.percent
構成オプションを使用できます。もちろん、この問題を解決するこの方法は、マップ フェーズ中に発生する他の問題を隠すこともできます。