これを Hadoop ユーザーのメーリング リストとここにクロスポストして申し訳ありませんが、これは私にとって緊急の問題になっています。
私の問題は次のとおりです。2つの入力ファイルがあり、特定したい
- a) ファイル 1 にのみ出現する行数
- b) ファイル 2 にのみ出現する行数
- c) 両方に共通の行数 (例: 文字列の等価性に関して)
例:
File 1:
a
b
c
File 2:
a
d
各ケースの望ましい出力:
lines_only_in_1: 2 (b, c)
lines_only_in_2: 1 (d)
lines_in_both: 1 (a)
基本的に私のアプローチは次のとおりです。マッパーが行 (テキスト) とソース ファイルを示すバイト (0 または 1) で構成されるペアを受け取るように、独自の LineRecordReader を作成しました。マッパーは再びペアを返すだけなので、実際には何もしません。ただし、副作用は、コンバイナが
Map<Line, Iterable<SourceId>>
(ここで、SourceId は 0 または 1 です)。
これで、行ごとに、それが表示されるソースのセットを取得できます。したがって、ケース (a、b、c) ごとに行数をカウントするコンバイナーを作成できます (リスト 1)。
次に、コンバイナーはクリーンアップ時にのみ「概要」を出力します (それは安全ですか?)。したがって、この要約は次のようになります。
lines_only_in_1 2531
lines_only_in_2 3190
lines_in_both 901
レデューサーでは、これらの集計の値のみを合計します。(したがって、レデューサーの出力は、コンバイナーの出力とまったく同じように見えます)。
ただし、主な問題は、両方のソース ファイルを単一の仮想ファイルとして扱い、(line, sourceId) // sourceId 0 または 1 の形式のレコードを生成する必要があることです。
そして、それを達成する方法がわかりません。したがって、問題は、事前にファイルの前処理とマージを回避し、仮想マージ ファイル リーダーやカスタム レコード リーダーのようなものを使用してオンザフライで実行できるかどうかです。コード例は大歓迎です。
よろしく、クラウス
リスト 1:
public static class SourceCombiner
extends Reducer<Text, ByteWritable, Text, LongWritable> {
private long countA = 0;
private long countB = 0;
private long countC = 0; // C = lines (c)ommon to both sources
@Override
public void reduce(Text key, Iterable<ByteWritable> values, Context context) throws IOException, InterruptedException {
Set<Byte> fileIds = new HashSet<Byte>();
for (ByteWritable val : values) {
byte fileId = val.get();
fileIds.add(fileId);
}
if(fileIds.contains((byte)0)) { ++countA; }
if(fileIds.contains((byte)1)) { ++countB; }
if(fileIds.size() >= 2) { ++countC; }
}
protected void cleanup(Context context)
throws java.io.IOException, java.lang.InterruptedException
{
context.write(new Text("in_a_distinct_count_total"), new LongWritable(countA));
context.write(new Text("in_b_distinct_count_total"), new LongWritable(countB));
context.write(new Text("out_common_distinct_count_total"), new LongWritable(countC));
}
}