2

Hadoop map reduce で単純なハッシュ結合プログラムを作成しました。アイデアは次のとおりです。

Hadoop フレームワークが提供する DistributedCache を使用して、小さなテーブルがすべてのマッパーに配布されます。大きなテーブルは、分割サイズが 64M のマッパーに分散されます。マッパーのセットアップ コードは、この小さなテーブルからすべての行を読み取るハッシュマップを作成します。マッパーコードでは、ハッシュマップ上ですべてのキーを検索(取得)し、キーがハッシュマップに存在する場合はそれを書き出します。この時点ではレデューサーは必要ありません。これは私たちが使用するコードです:

    public class Map extends Mapper<LongWritable, Text, Text, Text> {
        private HashMap<String, String> joinData = new HashMap<String, String>();

        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            String textvalue = value.toString();
            String[] tokens;
            tokens = textvalue.split(",");
            if (tokens.length == 2) {
                String joinValue = joinData.get(tokens[0]);
                if (null != joinValue) {
                    context.write(new Text(tokens[0]), new Text(tokens[1] + ","
                            + joinValue));
                }
            }
        }

    public void setup(Context context) {
        try {
            Path[] cacheFiles = DistributedCache.getLocalCacheFiles(context
                    .getConfiguration());
            if (null != cacheFiles && cacheFiles.length > 0) {
                String line;
                String[] tokens;
                BufferedReader br = new BufferedReader(new FileReader(
                        cacheFiles[0].toString()));
                try {
                    while ((line = br.readLine()) != null) {

                        tokens = line.split(",");
                        if (tokens.length == 2) {
                            joinData.put(tokens[0], tokens[1]);
                        }
                    }
                    System.exit(0);
                } finally {
                    br.close();
                }
            }

        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

このコードをテストしている間、小さなテーブルは 32M、大きなテーブルは 128M で、1 つのマスター ノードと 2 つのスレーブ ノードでした。

256M のヒープがある場合、このコードは上記の入力で失敗します。mapred-site.xml ファイルの mapred.child.java.opts で -Xmx256m を使用します。300m に増やすと、進行が非常に遅くなり、512m で最大スループットに達します。

マッパーがどこで大量のメモリを消費しているのかわかりません。上記の入力とマッパー コードを使用すると、ヒープ メモリが 256M に達するとは思えませんが、Java ヒープ スペース エラーで失敗します。

マッパーが大量のメモリを消費している理由を教えていただければ幸いです。

編集

13/03/11 09:37:33 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
13/03/11 09:37:33 INFO input.FileInputFormat: Total input paths to process : 1
13/03/11 09:37:33 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
13/03/11 09:37:33 WARN snappy.LoadSnappy: Snappy native library not loaded
13/03/11 09:37:34 INFO mapred.JobClient: Running job: job_201303110921_0004
13/03/11 09:37:35 INFO mapred.JobClient:  map 0% reduce 0%
13/03/11 09:39:12 INFO mapred.JobClient: Task Id : attempt_201303110921_0004_m_000000_0, Status : FAILED
Error: GC overhead limit exceeded
13/03/11 09:40:43 INFO mapred.JobClient: Task Id : attempt_201303110921_0004_m_000001_0, Status : FAILED
org.apache.hadoop.io.SecureIOUtils$AlreadyExistsException: File /usr/home/hadoop/hadoop-1.0.3/libexec/../logs/userlogs/job_201303110921_0004/attempt_201303110921_0004_m_000001_0/log.tmp already exists
    at org.apache.hadoop.io.SecureIOUtils.insecureCreateForWrite(SecureIOUtils.java:130)
    at org.apache.hadoop.io.SecureIOUtils.createForWrite(SecureIOUtils.java:157)
    at org.apache.hadoop.mapred.TaskLog.writeToIndexFile(TaskLog.java:312)
    at org.apache.hadoop.mapred.TaskLog.syncLogs(TaskLog.java:385)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:257)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:416)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)

attempt_201303110921_0004_m_000001_0: Exception in thread "Thread for syncLogs" java.lang.OutOfMemoryError: Java heap space
attempt_201303110921_0004_m_000001_0:   at java.io.BufferedOutputStream.<init>(BufferedOutputStream.java:76)
attempt_201303110921_0004_m_000001_0:   at java.io.BufferedOutputStream.<init>(BufferedOutputStream.java:59)
attempt_201303110921_0004_m_000001_0:   at org.apache.hadoop.mapred.TaskLog.writeToIndexFile(TaskLog.java:312)
attempt_201303110921_0004_m_000001_0:   at org.apache.hadoop.mapred.TaskLog.syncLogs(TaskLog.java:385)
attempt_201303110921_0004_m_000001_0:   at org.apache.hadoop.mapred.Child$3.run(Child.java:141)
attempt_201303110921_0004_m_000001_0: log4j:WARN No appenders could be found for logger (org.apache.hadoop.hdfs.DFSClient).
attempt_201303110921_0004_m_000001_0: log4j:WARN Please initialize the log4j system properly.
13/03/11 09:42:18 INFO mapred.JobClient: Task Id : attempt_201303110921_0004_m_000001_1, Status : FAILED
Error: GC overhead limit exceeded
13/03/11 09:43:48 INFO mapred.JobClient: Task Id : attempt_201303110921_0004_m_000001_2, Status : FAILED
Error: GC overhead limit exceeded
13/03/11 09:45:09 INFO mapred.JobClient: Job complete: job_201303110921_0004
13/03/11 09:45:09 INFO mapred.JobClient: Counters: 7
13/03/11 09:45:09 INFO mapred.JobClient:   Job Counters 
13/03/11 09:45:09 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=468506
13/03/11 09:45:09 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
13/03/11 09:45:09 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
13/03/11 09:45:09 INFO mapred.JobClient:     Launched map tasks=6
13/03/11 09:45:09 INFO mapred.JobClient:     Data-local map tasks=6
13/03/11 09:45:09 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=0
13/03/11 09:45:09 INFO mapred.JobClient:     Failed map tasks=1
4

1 に答える 1

4

メモリ消費がどこに向かっているのかを確実に言うのは難しいですが、いくつかの指針を以下に示します。

  • Text入力の行ごとに2 つのオブジェクトを作成しています。Textasクラス変数で一度初期化される2つのオブジェクトを使用するだけMapperで、各行でtext.set(...). これは Map/Reduce パターンの一般的な使用パターンであり、かなりのメモリ オーバーヘッドを節約できます。
  • SequenceFile入力に ​​format を使用することを検討する必要があります。これにより、行をtextValue.splitで解析する必要がなくなります。代わりに、このデータを配列として直接利用できます。このような文字列分割を行うと非常に負荷がかかる可能性があることを何度か読んだことがあります。そのため、メモリが本当に問題になる場合は、できるだけ回避する必要があります。KeyValueTextInputFormatあなたの例のように、キーと値のペアだけを気にする場合は、使用することも考えられます。

それだけでは不十分な場合は、このリンク、特にパート 7 を参照することをお勧めします。これにより、アプリケーションのプロファイルを作成し、何がどこに割り当てられているかを確認するための非常に簡単な方法が得られます。

于 2013-03-09T23:40:39.207 に答える