1

SequenceFileを使用して2つのmapReduceプログラム間でデータを渡そうとしています。渡したいデータの形式は>です。何らかの理由で、マップ内の一部のエントリが1つのプログラムから別のプログラムに渡されないようです。これが私のコードです。最初にdeSequenceFileOutputを生成するレデューサー、次にそれから読み取るマッパーです。

public static class IntSumReducer extends Reducer {

public void reduce(Text key, Iterable<Text> values, 
                   Context context
                   ) throws IOException, InterruptedException {

    MapWritable vector = new MapWritable() ;

    for (Text val : values){
        if(vector.containsKey(val)){
            vector.put(val , new IntWritable(((IntWritable)vector.get(val)).get() + 1));
        }
        else
            vector.put(val , new IntWritable(1));
    }

    context.write(key, vector);

        }
    }

とマッパー:

public static class TokenizerMapper extends Mapper {

  private final static int cota = 100;
  private final static double ady = 0.25;

  public void map(Text key, MapWritable value, Context context
          ) throws IOException, InterruptedException {

      IntWritable tot = (IntWritable)value.get(key);

      int total = tot.get();


      if(total > cota){
          MapWritable vector = new MapWritable() ;
          Set<Writable> keys = value.keySet();

          Iterator<Writable> iterator = keys.iterator();
          while(iterator.hasNext()){
              Text llave = (Text) iterator.next();
              if(!llave.equals(key)){
                  IntWritable cant = (IntWritable) value.get(llave);
                  double rel = (((double)cant.get())/(double)total);
                  if(cant.get() > cota && rel > ady ){
                      vector.put(llave, new DoubleWritable(rel));
                  }
              }
          }
          context.write(key,vector);     
      }
  }

}

4

1 に答える 1

1
for (Text val : values){
    if(vector.containsKey(val)){
        vector.put(val , new IntWritable(((IntWritable)vector.get(val)).get() + 1));
    }
    else
        vector.put(val , new IntWritable(1));
}

ここに問題があります-valTextオブジェクトはhadoopによって再利用されるため、vector.putを呼び出すときは、val参照から離れるために新しいTextオブジェクトを作成する必要があります(その値はforループの次の反復で変更されます)。

あなたは彼が従うようにあなたのロジックを修正することができ、それはうまくいくはずです(私はまたより効率的になるようにカウンターインクリメントロジックを更新しました):

IntWritable tmpInt;
for (Text val : values){
    tmpInt = (IntWritable) vector.get(val);

    if(tmpInt == null) {
        tmpInt = new IntWritable(0);
        // create a copy of val Text object
        vector.put(new Text(val), tmpInt);
    }

    // update the IntWritable wrapped int value
    tmpInt.set(tmpInt.get() + 1);

    // Note: you don't need to re-insert the IntWritable into the map
}
于 2012-05-14T18:25:27.060 に答える