たくさんのJobControlsを同時に実行していて、すべて同じセットのControlledJobsを使用しています。各JobControlは、日付範囲ごとに異なる入出力ファイルのセットを処理しますが、それらはすべてタイプです。私が観察している問題は、reduceステップが、異なる日付範囲を処理するreducerによって処理されるように設計されたデータを受信していることです。日付範囲はジョブによって設定され、入力と出力を決定するために使用され、レデューサー内のコンテキストから読み取られます。
JobControlsを順番に送信すると、これは停止しますが、それは良くありません。これは、カスタムパーティショナーで解決する必要があるものですか?現在の日付範囲を処理しているレデューサーがわからない場合、キーの正しいレデューサーをどのように決定すればよいですか?インスタンス化されたレデューサーがJobControlにロックされないのはなぜですか?
私は、Javaでの基本実装に対して、すべてのJobControls、Jobs、Maps、Reduceを記述しています。
糸で2.0.3-alphaを使用しています。それはそれと何か関係がありますか?
コードの共有には少し注意する必要がありますが、ここにサニタイズされたマッパーがあります。
protected void map(LongWritable key, ProtobufWritable<Model> value, Context context)
throws IOException, InterruptedException {
context.write(new Text(value.get().getSessionId()),
new ProtobufModelWritable(value.get()));
}
そしてレデューサー:
protected void reduce(Text sessionId, Iterable<ProtobufModelWritable> models, Context context)
throws IOException, InterruptedException {
Interval interval = getIntervalFromConfig(context);
Model2 model2 = collapseModels(Iterables.transform(models, TO_MODEL));
Preconditions.checkArgument(interval.contains(model2.getTimeStamp()),
"model2: " + model2 + " does not belong in " + interval);
}
private Interval getIntervalFromConfig(Context context) {
String i = context.getConfiguration().get(INTERVAL_KEY);
return Utils.interval(i);
}