1

レデューサーの数を増やし続けています。1つのレデューサーを除いてすべてが迅速に実行されてジョブを終了しますが、最後の1つのレデューサーは、タスクトラッカーログに次のメッセージが表示されてマージステップでハングします。

Down to the last merge-pass, with 3 segments left of total size: 171207264 bytes

...そして、このステートメントに長時間とどまると、Javaヒープエラーがスローされ、一部のクリーニングが開始されますが、終了しません。

child.optsのメモリを3.5GBに増やし(この制限を超えることはできません)、マップ出力も圧縮しました。

原因は何でしょうか?

ドライバーコードは次のとおりです。

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    conf.set("mapred.task.timeout", "6000000");
    conf.set("mapred.compress.map.output", "true");
    Job job = new Job(conf, "FreebasePreprocess_Phase2");
    job.setNumReduceTasks(6);
    job.setJarByClass(FreebasePreprocess.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);

    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    FileInputFormat.addInputPath(job, new Path("/user/watsonuser/freebase_data100m120m_output"));
    FileOutputFormat.setOutputPath(job, new Path("/user/watsonuser/freebase_data100m120m_output_2"));

    job.waitForCompletion(true);
}

マッパーは次のとおりです。

import java.io.IOException;
import java.util.Arrays;
import java.util.List;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;


public class Map extends Mapper<LongWritable, Text, Text, Text>{

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
{
    String[] entities = value.toString().split("\\t");
    String[] strings = {"/type/object/type", "/common/topic/notable_for", "/type/user/usergroup"};
    List<String> filteredPredicates = Arrays.asList(strings);

    FileSplit fileSplit = (FileSplit)context.getInputSplit();
    String filename = fileSplit.getPath().getName();
    //      System.out.println("File name "+filename);

    if(filename.startsWith("part-r")) {
        //      if(filename.equalsIgnoreCase("quad.tsv")) {
        //this is a quad dump file
        String name = null;
        String predicate = null;
        String oid = null;
        String outVal = null;
        String outKey = null;
        if(entities.length==3) {
            oid = entities[0].trim();
            predicate = entities[1].trim();
            name = entities[2].trim();

            /*if(predicate.contains("/type/object/name/lang"))
            {
                if(predicate.endsWith("/en")) 
                {*/
                /*outKey = sid;
                outVal = oid+"#-#-#-#"+"topic_name";
                context.write(new Text(outKey), new Text(outVal));*/
            /*  }
            }*/
                outKey = oid;
                outVal = predicate+"#-#-#-#"+name;
                context.write(new Text(outKey), new Text(outVal));

        }
    }

    else if(filename.equalsIgnoreCase("freebase-simple-topic-dump.tsv")) {
        //this is a simple topic dump file
        String sid = null;
        String name = null;
        String outKey = null;
        String outVal = null;
        if(entities.length>1) {
            sid = entities[0];
            name = entities[1];
            outKey = sid;
            outVal = name+"#-#-#-#"+"topic_name";
            context.write(new Text(outKey), new Text(outVal));
        }
    }
}

}

これがレデューサーです

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;


public class Reduce extends Reducer<Text, Text, Text, Text> 
{

public void reduce(Text key, Iterable<Text> values, Context context) 
        throws IOException, InterruptedException 
        {
            String name = null; 
            String sid = null;
            String predicate = null;
            String oid = null;
            String id = null;
            String outKey = null;
            String outVal = null;

            ArrayList<Text> valuesList = new ArrayList<Text>();
            Iterator<Text> ite = values.iterator();
            while(ite.hasNext()) {
                Text t = ite.next();
                Text txt = new Text();
                txt.set(t.toString());
                valuesList.add(txt);
                String[] entities = t.toString().split("#-#-#-#");
                if(entities[entities.length-1].equalsIgnoreCase("topic_name"))
                {
                    name = entities[0];
                }
            }

            for(int i=0; i<valuesList.size(); i++) { 
{ 

                Text t2 = valuesList.get(i);
                String[] entities = t2.toString().split("#-#-#-#");
                if(!entities[entities.length-1].contains("topic_name"))
                {
                    if(name!=null) {
                        outKey = entities[1]+"\t"+entities[0]+"\t"+name;
                    }
                    else {
                        outKey = entities[1]+"\t"+entities[0]+"\t"+key.toString();
                    }
                    context.write(new Text(outKey), null);
                }
            }
        }
}
4

2 に答える 2

1

私の推測では、膨大な数の値を持つ単一のキーがあり、リデューサーの次の行が問題を引き起こしていると思います。

valuesList.add(txt);

100m の値を持つキーがあり、サイズが 100m の配列リストを作成しようとしているとします。ある段階で、リデューサー JVM がメモリ不足になります。

これはおそらく、デバッグを実行し、終わらないレデューサーのログを調べることで確認できます。

valuesList.add(txt);
if (valuesList.size() % 10000 == 0) {
  System.err.println(key + "\t" + valueList.size());
}
于 2013-03-21T21:24:28.030 に答える
0

しばらく生の MR を書いていませんでしたが、次のような方法でアプローチします。

キーのすべての値をメモリに保持することは、常に危険です。代わりに、あなたの仕事に別の MR フェーズを追加します。最初の段階で、値に「トピック名」が含まれる場合は newkey = (key, 0)、newValue = value を発行し、値に「トピック名」が含まれない場合は newkey = (key, 1)、newValue = value を発行します。これには、ペアを処理でき、それをソートする方法を知っているカスタムの writablecomparable を作成する必要があります。

次のフェーズのレデューサーでは、新しいキーの最初の要素で分割するパーティショナーを作成します。最後のレデューサーのキーごとのソート出力により、各キーの他の k,v ペアを取得する前に、'name' を使用して k,v ペアを取得することが保証されます。これで、キーに対応する各値の「名前」にアクセスできます。

于 2013-03-24T23:12:02.363 に答える