0

9 GB の CSV ファイルを処理する必要があります。MR の間、いくつかのグループ化を行い、レガシー システム用の特別なフォーマットを生成する必要があります。

入力ファイルは次のようになります。

AppId;Username;Other Fields like timestamps...
app/10;Mr Foobar;...
app/10;d0x;...
app/10;Mr leet;...
app/110;kr1s;...
app/110;d0x;...
...

出力ファイルは次のように非常に単純です。

app/10;3;Mr Foobar;d0x;Mr leet
app/110;2;kr1s;d0x
^       ^ ^^^^^^^^
\ AppId \         \ A list with all users playing the game
         \
          \ Ammount of users

AppIdそれを解決するために、 as KeyUsernameas valueを返すマッパーを書きました。これにより、マッピング フェーズが正常に実行されます。

問題は削減フェーズで発生します。そこでIterator<Text> userIds、多くの userIds (>5.000.000) を持つリストを潜在的に含む を取得します。

これを処理するレデューサーは次のようになります。

public class UserToAppReducer extends Reducer<Text, Text, Text, UserSetWritable> {
    final UserSetWritable   userSet = new UserSetWritable();

    @Override
    protected void reduce(final Text appId, final Iterable<Text> userIds, final Context context) throws IOException, InterruptedException {
        this.userSet.clear();

        for (final Text userId : userIds) {
            this.userSet.add(userId.toString());

        }
        context.write(appId, this.userSet);
    }   
}

UserSetWritable、ユーザーのリストを格納する書き込み可能なカスタムです。これは、出力を生成するために必要です (キー = appId、値 = ユーザー名のリスト)。

電流UserSetWritableはこんな感じです。

 public class UserSetWritable implements Writable {
    private final Set<String>   userIds = new HashSet<String>();

    public void add(final String userId) {
        this.userIds.add(userId);
    }

    @Override
    public void write(final DataOutput out) throws IOException {
        out.writeInt(this.userIds.size());

        for (final String userId : this.userIds) {
            out.writeUTF(userId);
        }
    }

    @Override
    public void readFields(final DataInput in) throws IOException {
        final int size = in.readInt();

        for (int i = 0; i < size; i++) {
            this.userIds.add(readUTF);
        }

    }

    @Override
    public String toString() {
        String result = "";
        for (final String userId : this.userIds) {
            result += userId + "\t";
        }

        result += this.userIds.size();
        return result;
    }

    public void clear() {
        this.userIds.clear();
    }

 }

このアプローチでは、Java HeapOutOfMemory Exception が発生します。

Error: Java heap space
attempt_201303072200_0016_r_000002_0: WARN : mapreduce.Counters - Group org.apache.hadoop.mapred.Task$Counter is deprecated. Use org.apache.hadoop.mapreduce.TaskCounter instead
attempt_201303072200_0016_r_000002_0: WARN : org.apache.hadoop.conf.Configuration - session.id is deprecated. Instead, use dfs.metrics.session-id
attempt_201303072200_0016_r_000002_0: WARN : org.apache.hadoop.conf.Configuration - slave.host.name is deprecated. Instead, use dfs.datanode.hostname
attempt_201303072200_0016_r_000002_0: FATAL: org.apache.hadoop.mapred.Child - Error running child : java.lang.OutOfMemoryError: Java heap space
attempt_201303072200_0016_r_000002_0:   at java.util.Arrays.copyOfRange(Arrays.java:3209)
attempt_201303072200_0016_r_000002_0:   at java.lang.String.<init>(String.java:215)
attempt_201303072200_0016_r_000002_0:   at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:542)
attempt_201303072200_0016_r_000002_0:   at java.nio.CharBuffer.toString(CharBuffer.java:1157)
attempt_201303072200_0016_r_000002_0:   at org.apache.hadoop.io.Text.decode(Text.java:394)
attempt_201303072200_0016_r_000002_0:   at org.apache.hadoop.io.Text.decode(Text.java:371)
attempt_201303072200_0016_r_000002_0:   at org.apache.hadoop.io.Text.toString(Text.java:273)
attempt_201303072200_0016_r_000002_0:   at     com.myCompany.UserToAppReducer.reduce(UserToAppReducer.java:21)
attempt_201303072200_0016_r_000002_0:   at     com.myCompany.UserToAppReducer.reduce(UserToAppReducer.java:1)
attempt_201303072200_0016_r_000002_0:   at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:164)
attempt_201303072200_0016_r_000002_0:   at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:610)
attempt_201303072200_0016_r_000002_0:   at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:444)
attempt_201303072200_0016_r_000002_0:   at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
attempt_201303072200_0016_r_000002_0:   at java.security.AccessController.doPrivileged(Native Method)
attempt_201303072200_0016_r_000002_0:   at javax.security.auth.Subject.doAs(Subject.java:396)
attempt_201303072200_0016_r_000002_0:   at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
attempt_201303072200_0016_r_000002_0:   at org.apache.hadoop.mapred.Child.main(Child.java:262)

UserToAppReducer.java:21この行は次のとおりです。this.userSet.add(userId.toString());

同じクラスターで、この豚のスクリプトを使用してデータを処理できます。

set job.name convertForLegacy
set default_parallel 4
data = load '/data/...txt' 

using PigStorage(',') 
as (appid:chararray,uid:chararray,...);
grp = group data by appid;
counter = foreach grp generate group, data.uid, COUNT(data);
store counter into '/output/....' using PigStorage(',');

では、この OutOfMemoryException を MapReduce で解決するにはどうすればよいでしょうか?

4

1 に答える 1

1

「大きな」値を書き出すための同様の質問:Hadoopのreduceステップからの大きな出力値の処理

この概念を使用して大規模なレコードを書き出す(10万人のユーザーの必要なCSVリストを取得する)ことに加えて、複合キー(アプリIDとユーザーID)とカスタムパーティショナーを使用して、すべての単一のアプリIDのキーは、レデューサーに到達します。

この要点が好きな人もいます(テストされていません)。

于 2013-03-09T01:01:12.293 に答える