0

EquivalenceClsAggValue配列のデータフィールドを持つクラスを定義しました( と呼ばれaggValuesます)。

class public class EquivalenceClsAggValue extends Configured implements WritableComparable<EquivalenceClsAggValue>{

public ArrayList<SortedMapWritable> aggValues;  

次のように、タイプの別のオブジェクトを取得し、このクラスにEquivalenceClsAggValueマージするメソッドがあります。aggValuesaggValues

public void addEquivalenceCls(EquivalenceClsAggValue eq){
    //comment: eq contains only one entry as it comes from the mapper

    if (this.aggValues.size()==0){ //new line
        this.aggValues = eq.aggValues;  
                    return;     
    }

    for(int i=0;i<eq.aggValues.size();i++){

        SortedMapWritable cm = aggValues.get(i); //cm: current map
        SortedMapWritable nm = eq.aggValues.get(i); //nm: new map
        Text nk = (Text) nm.firstKey();//nk: new key

        if(cm.containsKey(nk)){//increment the value
            IntWritable ovTmp = (IntWritable) cm.get(nk);
            int ov = ovTmp.get();
            cm.remove(nk);
            cm.put(nk, new IntWritable(ov+1));
        }
        else{//add new entry
            cm.put(nk, new IntWritable(1));
        }


    }
}

しかし、この関数は 2 つをマージしていませんaggValues。誰かが私がそれを理解するのを手伝ってくれますか? これが私がこのメソッドを呼び出す方法です:

public void reduce(IntWritable keyin,Iterator<EquivalenceClsAggValue> valuein,OutputCollector<IntWritable, EquivalenceClsAggValue> output,Reporter arg3) throws IOException {

        EquivalenceClsAggValue comOutput = valuein.next();//initialize the output with the first input

        while(valuein.hasNext()){
            EquivalenceClsAggValue e = valuein.next();
            comOutput.addEquivalenceCls(e);             
        }           
        output.collect(keyin, comOutput);
    }
4

1 に答える 1

1

オブジェクトの再利用に違反しているようです。Hadoop は同じオブジェクトを再利用するため、各呼び出しはvaluein.next()実際には同じオブジェクト参照を返しますが、そのオブジェクトの内容は readFields メソッドによって再初期化されます。

次のように変更してみてください (集約する新しいインスタンスを作成します)。

 EquivalenceClsAggValue comOutput = new EquivalenceClsAggValue();

 while(valuein.hasNext()){
   EquivalenceClsAggValue e = valuein.next();
   comOutput.addEquivalenceCls(e);             
 }           
 output.collect(keyin, comOutput);

EDIT :おそらく集計メソッドも更新する必要があります(オブジェクトの再利用に注意するため):

public void addEquivalenceCls(EquivalenceClsAggValue eq){
  //comment: eq contains only one entry as it comes from the mapper

  for(int i=0;i<eq.aggValues.size();i++){

    SortedMapWritable cm = aggValues.get(i); //cm: current map
    SortedMapWritable nm = eq.aggValues.get(i); //nm: new map
    Text nk = (Text) nm.firstKey();//nk: new key

    if(cm.containsKey(nk)){//increment the value
        // you don't need to remove and re-add, just update the IntWritable
        IntWritable ovTmp = (IntWritable) cm.get(nk);
        ovTmp.set(ovTmp.get() + 1);
    }
    else{//add new entry
        // be sure to create a copy of nk when you add in to the map
        cm.put(new Text(nk), new IntWritable(1));
    }
  }
}
于 2013-01-13T21:43:27.157 に答える