0

私はHadoopで短期間働いていて、Javaで結合を実装しようとしています。Map-Side か Reduce-Side かは関係ありません。実装が簡単になるはずだったので、Reduce-Side join を採用しました。Java は、結合、集計などに最適な選択ではないことを知っています。すでに行っている Hive または Pig を選択する必要があります。ただし、私は研究プロジェクトに取り組んでおり、比較を行うためにこれら 3 つの言語をすべて使用する必要があります。

とにかく、構造の異なる2つの入力ファイルがあります。1 つは key|value で、もう 1 つは key|value1;value2;value3;value4 です。各入力ファイルからの 1 つのレコードは、次のようになります。

  • 入力 1:1;2010-01-10T00:00:01
  • 入力 2:1;23;Blue;2010-01-11T00:00:01;9999-12-31T23:59:59

Hadoop Definitve Guide ブックの例に従いましたが、うまくいきませんでした。Java ファイルをここに投稿しているので、何が問題なのかがわかります。

public class LookupReducer extends Reducer<TextPair,Text,Text,Text> {


private String result = "";
private String msisdn;
private String attribute, product;
private long trans_dt_long, start_dt_long, end_dt_long; 
private String trans_dt, start_dt, end_dt; 

@Override
public void reduce(TextPair key, Iterable<Text> values, Context context) 
        throws IOException, InterruptedException {

     context.progress();
    //value without key to remember

    Iterator<Text> iter =  values.iterator();

 for (Text val : values) {

Text recordNoKey = val;     //new Text(iter.next());

String valSplitted[] = recordNoKey.toString().split(";"); 

//if the input is coming from CDR set corresponding values

    if(key.getSecond().toString().equals(CDR.CDR_TAG))
    {
        trans_dt = recordNoKey.toString();
        trans_dt_long = dateToLong(recordNoKey.toString());
    }
  //if the input is coming from Attributes set corresponding values
    else if(key.getSecond().toString().equals(Attribute.ATT_TAG))
    {
        attribute = valSplitted[0];
        product = valSplitted[1];
        start_dt = valSplitted[2];
        start_dt_long = dateToLong(valSplitted[2]);
        end_dt = valSplitted[3];
        end_dt_long = dateToLong(valSplitted[3]);;
    }

        Text record = val;  //iter.next();
        //System.out.println("RECORD: " + record);
        Text outValue = new Text(recordNoKey.toString() + ";" + record.toString());     

if(start_dt_long < trans_dt_long && trans_dt_long < end_dt_long)
       {
    //concat output columns
        result = attribute + ";" + product + ";" + trans_dt;    

    context.write(key.getFirst(), new Text(result));
    System.out.println("KEY: " + key);
        }
    }
}

private static long dateToLong(String date){
    DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    Date parsedDate = null;
    try {
        parsedDate = formatter.parse(date);
    } catch (ParseException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    long dateInLong = parsedDate.getTime();

    return dateInLong;

}

public static class TextPair implements WritableComparable<TextPair> {

    private Text first;
    private Text second;

    public TextPair(){
        set(new Text(), new Text());
    }

    public TextPair(String first, String second){
        set(new Text(first), new Text(second));
    }

    public TextPair(Text first, Text second){
        set(first, second);
    }

    public void set(Text first, Text second){
        this.first = first;
        this.second = second;
    }

    public Text getFirst() {
        return first;
    }

    public void setFirst(Text first) {
        this.first = first;
    }

    public Text getSecond() {
        return second;
    }

    public void setSecond(Text second) {
        this.second = second;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        // TODO Auto-generated method stub
        first.readFields(in);
        second.readFields(in);
    }

    @Override
    public void write(DataOutput out) throws IOException {
        // TODO Auto-generated method stub
        first.write(out);
        second.write(out);
    }

    @Override
    public int hashCode(){
        return first.hashCode() * 163 + second.hashCode();
    }

    @Override
    public boolean equals(Object o){
        if(o instanceof TextPair)
        {
            TextPair tp = (TextPair) o;
            return first.equals(tp.first) && second.equals(tp.second);
        }
        return false;
    }

    @Override
    public String toString(){
        return first + ";" + second;
    }

    @Override
    public int compareTo(TextPair tp) {
        // TODO Auto-generated method stub
        int cmp = first.compareTo(tp.first);
        if(cmp != 0)
            return cmp;
        return second.compareTo(tp.second);
    }


    public static class FirstComparator extends WritableComparator {

        protected FirstComparator(){
            super(TextPair.class, true);
        }

        @Override
        public int compare(WritableComparable comp1, WritableComparable comp2){
            TextPair pair1 = (TextPair) comp1;
            TextPair pair2 = (TextPair) comp2;
            int cmp = pair1.getFirst().compareTo(pair2.getFirst());

            if(cmp != 0)
                return cmp;

            return -pair1.getSecond().compareTo(pair2.getSecond());
        }
    }

    public static class GroupComparator extends WritableComparator {
        protected GroupComparator() 
        {
            super(TextPair.class, true);
        }

        @Override
        public int compare(WritableComparable comp1, WritableComparable comp2)
        {
            TextPair pair1 =  (TextPair) comp1;
            TextPair pair2 =  (TextPair) comp2;

            return pair1.compareTo(pair2);
        }
    }

}

}

public class Joiner  extends Configured implements Tool {

public static final String DATA_SEPERATOR =";";                                      //Define the symbol for seperating the output data
public static final String NUMBER_OF_REDUCER = "1";                                  //Define the number of the used reducer jobs
public static final String COMPRESS_MAP_OUTPUT = "true";                             //if the output from the mapping process should be compressed, set COMPRESS_MAP_OUTPUT = "true" (if not set it to "false")
public static final String 
            USED_COMPRESSION_CODEC = "org.apache.hadoop.io.compress.SnappyCodec";    //set the used codec for the data compression
public static final boolean JOB_RUNNING_LOCAL = true;                                //if you run the Hadoop job on your local machine, you have to set JOB_RUNNING_LOCAL = true
                                                                                     //if you run the Hadoop job on the Telefonica Cloud, you have to set JOB_RUNNING_LOCAL = false
public static final String OUTPUT_PATH = "/home/hduser"; //set the folder, where the output is saved. Only needed, if JOB_RUNNING_LOCAL = false



public static class KeyPartitioner extends Partitioner<TextPair, Text> {
    @Override
    public int getPartition(/*[*/TextPair key/*]*/, Text value, int numPartitions) {
        System.out.println("numPartitions: " + numPartitions);
          return (/*[*/key.getFirst().hashCode()/*]*/ & Integer.MAX_VALUE) % numPartitions;
        }
}

private static Configuration hadoopconfig() {
    Configuration conf = new Configuration();

    conf.set("mapred.textoutputformat.separator", DATA_SEPERATOR);
    conf.set("mapred.compress.map.output", COMPRESS_MAP_OUTPUT);
    //conf.set("mapred.map.output.compression.codec", USED_COMPRESSION_CODEC);
    conf.set("mapred.reduce.tasks", NUMBER_OF_REDUCER);
    return conf;
}

@Override
public int run(String[] args) throws Exception {
    // TODO Auto-generated method stub
    if ((args.length != 3) && (JOB_RUNNING_LOCAL)) {

        System.err.println("Usage: Lookup <CDR-inputPath> <Attribute-inputPath> <outputPath>");
        System.exit(2);
    }

    //starting the Hadoop job
    Configuration conf = hadoopconfig();
    Job job = new Job(conf, "Join cdrs and attributes");
    job.setJarByClass(Joiner.class);

    MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, CDRMapper.class);
    MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, AttributeMapper.class);
    //FileInputFormat.addInputPath(job, new Path(otherArgs[0]));    //expecting a folder instead of a file

    if(JOB_RUNNING_LOCAL)
        FileOutputFormat.setOutputPath(job, new Path(args[2]));
    else
        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));


    job.setPartitionerClass(KeyPartitioner.class);
    job.setGroupingComparatorClass(TextPair.FirstComparator.class);
    job.setReducerClass(LookupReducer.class);

    job.setMapOutputKeyClass(TextPair.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    return job.waitForCompletion(true) ? 0 : 1;
}

 public static void main(String[] args) throws Exception {

     int exitCode = ToolRunner.run(new Joiner(), args);
     System.exit(exitCode);

 }
}

public class Attribute {

public static final String ATT_TAG = "1";


public static class AttributeMapper 
extends Mapper<LongWritable, Text, TextPair, Text>{

    private static Text values = new Text();
    //private Object output = new Text();

    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //partition the input line by the separator semicolon   
        String[] attributes = value.toString().split(";");
        String valuesInString = "";

        if(attributes.length != 5)
            System.err.println("Input column number not correct. Expected 5, provided " + attributes.length
                    + "\n" + "Check the input file");
        if(attributes.length == 5)
        {
            //setting the values with the input values read above
            valuesInString = attributes[1] + ";" + attributes[2] + ";" + attributes[3] + ";" + attributes[4];
            values.set(valuesInString);
        //writing out the key and value pair
        context.write( new TextPair(new Text(String.valueOf(attributes[0])), new Text(ATT_TAG)), values);
            }
    }
}   

}

public class CDR    {


public static final String CDR_TAG = "0";

 public static class CDRMapper 
    extends Mapper<LongWritable, Text, TextPair, Text>{

        private static Text values = new Text();
        private Object output = new Text();

    @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //partition the input line by the separator semicolon   
    String[] cdr = value.toString().split(";");

    //setting the values with the input values read above
    values.set(cdr[1]);
    //output = CDR_TAG + cdr[1];

    //writing out the key and value pair
    context.write( new TextPair(new Text(String.valueOf(cdr[0])), new Text(CDR_TAG)), values);
        }


     }

}

誰かがチュートリアルへのリンク、またはそのような結合機能が実装されている簡単な例を投稿できれば幸いです。いろいろ調べたのですが、コードが完全でなかったり、説明が足りなかったり。

4

1 に答える 1

3

正直なところ、私はあなたのコードが何をしようとしているのかわかりませんが、それはおそらく私が別の方法でそれを行い、あなたが使用しているAPIに精通していないためです。

私は次のように最初から始めます:

  • マッパーを作成して、ファイルの1つを読み取ります。読み取られた行ごとに、キーと値のペアをコンテキストに書き込みます。キーはキーから作成されたテキストであり、値は「1」を入力レコード全体と連結して作成された別のテキストです。
  • 他のファイル用に別のマッパーを作成します。このマッパーは最初のマッパーと同じように機能しますが、値は「2」を入力レコード全体と連結して作成されたテキストです。
  • 結合を行うためのレデューサーを作成します。reduce()メソッドは、特定のキーに対して書き込まれたすべてのレコードを取得します。値が「1」で始まるか「2」で始まるかを確認することで、どの入力ファイル(したがってレコードのデータ形式)を判別できます。1つ、もう1つ、または両方のレコードタイプがあるかどうかがわかれば、1つまたは2つのレコードのデータをマージするために必要なロジックを記述できます。

ちなみに、MultipleInputsクラスを使用して、ジョブ/ドライバークラスに複数のマッパーを構成します。

于 2012-12-07T17:44:13.783 に答える