5

これを Hadoop ユーザーのメーリング リストとここにクロスポストして申し訳ありませんが、これは私にとって緊急の問題になっています。

私の問題は次のとおりです。2つの入力ファイルがあり、特定したい

  • a) ファイル 1 にのみ出現する行数
  • b) ファイル 2 にのみ出現する行数
  • c) 両方に共通の行数 (例: 文字列の等価性に関して)

例:

File 1:
a
b
c

File 2:
a
d

各ケースの望ましい出力:

lines_only_in_1: 2         (b, c)
lines_only_in_2: 1         (d)
lines_in_both:   1         (a)

基本的に私のアプローチは次のとおりです。マッパーが行 (テキスト) とソース ファイルを示すバイト (0 または 1) で構成されるペアを受け取るように、独自の LineRecordReader を作成しました。マッパーは再びペアを返すだけなので、実際には何もしません。ただし、副作用は、コンバイナが

Map<Line, Iterable<SourceId>>

(ここで、SourceId は 0 または 1 です)。

これで、行ごとに、それが表示されるソースのセットを取得できます。したがって、ケース (a、b、c) ごとに行数をカウントするコンバイナーを作成できます (リスト 1)。

次に、コンバイナーはクリーンアップ時にのみ「概要」を出力します (それは安全ですか?)。したがって、この要約は次のようになります。

lines_only_in_1   2531
lines_only_in_2   3190
lines_in_both      901

レデューサーでは、これらの集計の値のみを合計します。(したがって、レデューサーの出力は、コンバイナーの出力とまったく同じように見えます)。

ただし、主な問題は、両方のソース ファイルを単一の仮想ファイルとして扱い、(line, sourceId) // sourceId 0 または 1 の形式のレコードを生成する必要があることです。

そして、それを達成する方法がわかりません。したがって、問題は、事前にファイルの前処理とマージを回避し、仮想マージ ファイル リーダーやカスタム レコード リーダーのようなものを使用してオンザフライで実行できるかどうかです。コード例は大歓迎です。

よろしく、クラウス

リスト 1:

public static class SourceCombiner
    extends Reducer<Text, ByteWritable, Text, LongWritable> {

    private long countA = 0;
    private long countB = 0;
    private long countC = 0; // C = lines (c)ommon to both sources

    @Override
    public void reduce(Text key, Iterable<ByteWritable> values, Context context) throws IOException, InterruptedException {
        Set<Byte> fileIds = new HashSet<Byte>();
        for (ByteWritable val : values) {
            byte fileId = val.get();

            fileIds.add(fileId);
        }

        if(fileIds.contains((byte)0)) { ++countA; }
        if(fileIds.contains((byte)1)) { ++countB; }
        if(fileIds.size() >= 2) { ++countC; }
    }

    protected void cleanup(Context context)
            throws java.io.IOException, java.lang.InterruptedException
    {
        context.write(new Text("in_a_distinct_count_total"), new LongWritable(countA));
        context.write(new Text("in_b_distinct_count_total"), new LongWritable(countB));
        context.write(new Text("out_common_distinct_count_total"), new LongWritable(countC));
    }
}
4

1 に答える 1

2

さて、私はあなたがこれまでに試したことの要点を実際には理解していなかったことを認めなければなりませんが、私はあなたが必要とするかもしれないことをするための簡単なアプローチを持っています。

ファイルマッパーを見てください。これはファイル名を取得し、入力の各行で送信します。

    public class FileMapper extends Mapper<LongWritable, Text, Text, Text> {

        static Text fileName;

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            context.write(value, fileName);
        }

        @Override
        protected void setup(Context context) throws IOException,
                InterruptedException {

            String name = ((FileSplit) context.getInputSplit()).getPath().getName();
            fileName = new Text(name);
        }
    }

これで、次のようなキー/値の束ができました(例に関して)

    a File 1
    b File 1
    c File 1

    a File 2
    d File 2

明らかにそれらを減らすと、次のような入力が得られます。

    a File 1,File 2
    b File 1
    c File 1
    d File 2

レデューサーで行う必要があることは、次のようになります。

public class FileReducer extends Reducer<Text, Text, Text, Text> {

    enum Counter {
        LINES_IN_COMMON, LINES_IN_FIRST, LINES_IN_SECOND
    }

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        HashSet<String> set = new HashSet<String>();
        for (Text t : values) {
            set.add(t.toString());
        }

        // if we have only two files and we have just two records in our hashset
        // the line is contained in both files
        if (set.size() == 2) {
            context.getCounter(Counter.LINES_IN_COMMON).increment(1);
        } else {
            // sorry this is a bit dirty...
            String t = set.iterator().next();
            // determine which file it was by checking for the name:
            if(t.toString().equals("YOUR_FIRST_FILE_NAME")){
                context.getCounter(Counter.LINES_IN_FIRST).increment(1);
            } else {
                context.getCounter(Counter.LINES_IN_SECOND).increment(1);
            }
        }
    }

}

ifステートメント内の文字列をファイル名に置き換える必要があります。

ジョブカウンターを使用する方が、独自のプリミティブを使用してクリーンアップでコンテキストに書き込むよりも少し明確だと思います。完了後に次のようなものを呼び出すことで、ジョブのカウンターを取得できます。

Job job = new Job(new Configuration());
//setup stuff etc omitted..
job.waitForCompletion(true);
// do the same line with the other enums
long linesInCommon = job.getCounters().findCounter(Counter.LINES_IN_COMMON).getValue();

それでもなお、HDFSで共通の回線数などが必要な場合は、ソリューションを選択してください。

それがあなたを助けたことを願っています。

于 2011-06-24T19:12:23.177 に答える