1

イテレーターの現在の値を保存して、Reduce メソッドのイテレーターの次の値と比較したい場合、Hadoop では、一時変数への参照を単純に割り当てるのではなく、複製する必要があります。

コードをレデューサーに投稿しようとしています。

次の 2 つの部分が表示されます。

  1. Eclipse でテストするための主な方法
  2. Hadoop で実行するための reduce メソッド

次の点を除いて、両方のコード行が同一であることがわかります。

  1. main メソッドはハード コードした ArrayList から Iterator を取得しますが、reduce メソッドはマッパー メソッドから Iterator を取得します。
  2. もちろん、main メソッドは context.write を実行しません。

両方がほとんど共有するコードは次のとおりです。

MMI currentMMI = null;
MMI previousMMI = null;
UltraAggregation currentAggregation = null;

while (values.hasNext()) {
    currentMMI = values.next();
    if (currentAggregation == null) {
        currentAggregation = new UltraAggregation(currentMMI);
    }
    if (previousMMI == null) {
        //previousMMI = new MMI(currentMMI);
        previousMMI = currentMMI;
        continue;
    }
    System.out.println();
    System.out.println("currentMMI = " + currentMMI);
    System.out.println("previousMMI = " + previousMMI);
    System.out.println("equals? " + currentMMI.equals(previousMMI));
    System.out.println("==? " + (currentMMI == previousMMI));
    System.out.println();

    // Business logic goes here and involves a context.write on certain conditions

    previousMMI = currentMMI;
}
//final context.write

各ループの最後で、使用したばかりの MMI (「currentMMI」) の参照をオブジェクト変数「previousMMI」に設定していることに気付くでしょう。次に、次のループで、next() の参照を currentMMI に設定します。Eclipse でメイン メソッドを実行すると、予想どおり、次のクエリは false と評価されます。

currentMMI == previousMMI;
currentMMI.equals(previousMMI);  

ただし、Hadoop で実行すると、次の 2 つのクエリで currentMMI と previousMMI が常に true と評価されます。

currentMMI == previousMMI;
currentMMI.equals(previousMMI);

previousMMI = currentMMI行を変更した場合にのみ、previousMMI = new MMI(currentMMI)false と評価されます。(受信パラメータを本質的に浅く複製する MMI クラスのコンストラクタを作成しました)。

レデューサーを Hadoop で使用し、メイン メソッドでは使用しない場合、参照を設定する代わりにクローンを作成する必要があるのはなぜですか?

レデューサー クラスをコピーして貼り付けます。これには、Eclipse テスト用のメイン メソッドと、Hadoop で実際に使用するための reduce メソッドの 2 つの部分があります。

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import com.cisco.webex.hadoop.ultrautility.models.MMI;
import com.cisco.webex.hadoop.ultrautility.models.UltraAggregation;

public class MMIReducer extends Reducer<Text, MMI, Object, UltraAggregation> {
    public static void main(String[] args) {
        ArrayList<MMI> mmis = new ArrayList<MMI>();
        mmis.add(new MMI("961864,1,1,1,D1,10,0,2013-08-02 06:00:00.0,USA,N,N"));
        mmis.add(new MMI("961865,1,1,1,D1,10,1,2013-08-02 07:00:00.0,USA,N,N"));
        mmis.add(new MMI("961866,1,1,1,D1,10,2,2013-08-02 08:00:00.0,USA,N,N"));
        mmis.add(new MMI("961867,1,1,1,D1,10,3,2013-08-02 09:00:00.0,USA,N,N"));
        mmis.add(new MMI("961868,1,1,1,D1,10,4,2013-08-02 10:00:00.0,USA,N,N"));
        mmis.add(new MMI("961869,1,1,1,D1,10,5,2013-08-02 11:00:00.0,USA,N,N"));
        mmis.add(new MMI("961870,1,1,1,D1,10,6,2013-08-02 12:00:00.0,USA,N,N"));
        mmis.add(new MMI("961871,1,1,1,D1,10,7,2013-08-02 13:00:00.0,USA,N,N"));
        mmis.add(new MMI("961872,1,1,1,D1,10,8,2013-08-02 14:00:00.0,USA,N,N"));
        mmis.add(new MMI("961873,1,1,1,D1,10,9,2013-08-02 15:00:00.0,USA,N,N"));

        Iterator<MMI> values = mmis.iterator();

        MMI currentMMI = null;
        MMI previousMMI = null;
        UltraAggregation currentAggregation = null;

        while (values.hasNext()) {
            currentMMI = values.next();
            if (currentAggregation == null) {
                currentAggregation = new UltraAggregation(currentMMI);
            }
            if (previousMMI == null) {
                //previousMMI = new MMI(currentMMI);
                previousMMI = currentMMI;
                continue;
            }
            System.out.println();
            System.out.println("currentMMI = " + currentMMI);
            System.out.println("previousMMI = " + previousMMI);
            System.out.println("equals? " + currentMMI.equals(previousMMI));
            System.out.println("==? " + (currentMMI == previousMMI));
            System.out.println();

            // Business logic goes here and involves a context.write on certain conditions

            //previousMMI = new MMI(currentMMI);
            /*
            * THIS DOESNT CAUSE LOGIC ERRORS IN MAIN METHOD
            */
            previousMMI = currentMMI;
        }
        //context.write(null, currentAggregation);
    }

    @Override
    public void reduce(Text key, Iterable<MMI> vals, Context context) throws IOException, InterruptedException {
        Iterator<MMI> values = vals.iterator();

        //key = deviceId
        MMI currentMMI = null;
        MMI previousMMI = null;
        UltraAggregation currentAggregation = null;

        while (values.hasNext()) {
            currentMMI = values.next();
            if (currentAggregation == null) {
                currentAggregation = new UltraAggregation(currentMMI);
            }
            if (previousMMI == null) {
                System.out.println("PreviousMMI is null, setting previousMMI to current MMI and continuing");
                //previousMMI = new MMI(currentMMI);
                previousMMI = currentMMI;
                continue;
            }
            System.out.println();
            System.out.println("currentMMI = " + currentMMI);
            System.out.println("previousMMI = " + previousMMI);
            System.out.println("equals? " + currentMMI.equals(previousMMI));
            System.out.println("==? " + (currentMMI == previousMMI));
            System.out.println();

            // Business logic goes here and involves a context.write on certain conditions

            //previousMMI = new MMI(currentMMI); //Acts as intended
            /*
            * THIS CAUSES ERRORS WHEN EXECUTED THROUGH HADOOP
            */
            previousMMI = currentMMI; // Causes errors
        }
        context.write(null, currentAggregation);
    }
}

これは、静的な値を使用して eclipse でメイン メソッドを実行したときの stdout からの切り捨てられた結果です。

currentMMI = Device Id|D1;Entitlement Tag|10;Device Time|Fri Aug 02 07:00:00 PDT 2013;Uptime|1.0
previousMMI = Device Id|D1;Entitlement Tag|10;Device Time|Fri Aug 02 06:00:00 PDT 2013;Uptime|0.0
equals? false
==? false


currentMMI = Device Id|D1;Entitlement Tag|10;Device Time|Fri Aug 02 08:00:00 PDT 2013;Uptime|2.0
previousMMI = Device Id|D1;Entitlement Tag|10;Device Time|Fri Aug 02 07:00:00 PDT 2013;Uptime|1.0
equals? false
==? false

Hadoop jar を実行したときの切り捨てられた結果を次に示します。

currentMMI = Device Id|D1;Entitlement Tag|10;Device Time|Sun Aug 04 06:00:00 PDT 2013;Uptime|0.0
previousMMI = Device Id|D1;Entitlement Tag|10;Device Time|Sun Aug 04 06:00:00 PDT 2013;Uptime|0.0
equals? true
==? true

currentMMI = Device Id|D1;Entitlement Tag|10;Device Time|Sun Aug 04 07:00:00 PDT 2013;Uptime|1.0
previousMMI = Device Id|D1;Entitlement Tag|10;Device Time|Sun Aug 04 07:00:00 PDT 2013;Uptime|1.0
equals? true
==? true

Eclipse ではなく Hadoop 用に複製する必要があるのはなぜですか?

4

2 に答える 2

4

すべての値をメモリに格納するのは非常に非効率的であるため、オブジェクトは再利用され、一度に 1 つずつ読み込まれます。良い説明については、この他の SO の質問を参照してください。概要:

[...]Iterable値リストをループすると、各 Object インスタンスが再利用されるため、一度に 1 つのインスタンスしか保持されません。

于 2013-12-31T22:36:11.890 に答える