イテレーターの現在の値を保存して、Reduce メソッドのイテレーターの次の値と比較したい場合、Hadoop では、一時変数への参照を単純に割り当てるのではなく、複製する必要があります。
コードをレデューサーに投稿しようとしています。
次の 2 つの部分が表示されます。
- Eclipse でテストするための主な方法
- Hadoop で実行するための reduce メソッド
次の点を除いて、両方のコード行が同一であることがわかります。
- main メソッドはハード コードした ArrayList から Iterator を取得しますが、reduce メソッドはマッパー メソッドから Iterator を取得します。
- もちろん、main メソッドは context.write を実行しません。
両方がほとんど共有するコードは次のとおりです。
MMI currentMMI = null;
MMI previousMMI = null;
UltraAggregation currentAggregation = null;
while (values.hasNext()) {
currentMMI = values.next();
if (currentAggregation == null) {
currentAggregation = new UltraAggregation(currentMMI);
}
if (previousMMI == null) {
//previousMMI = new MMI(currentMMI);
previousMMI = currentMMI;
continue;
}
System.out.println();
System.out.println("currentMMI = " + currentMMI);
System.out.println("previousMMI = " + previousMMI);
System.out.println("equals? " + currentMMI.equals(previousMMI));
System.out.println("==? " + (currentMMI == previousMMI));
System.out.println();
// Business logic goes here and involves a context.write on certain conditions
previousMMI = currentMMI;
}
//final context.write
各ループの最後で、使用したばかりの MMI (「currentMMI」) の参照をオブジェクト変数「previousMMI」に設定していることに気付くでしょう。次に、次のループで、next() の参照を currentMMI に設定します。Eclipse でメイン メソッドを実行すると、予想どおり、次のクエリは false と評価されます。
currentMMI == previousMMI;
currentMMI.equals(previousMMI);
ただし、Hadoop で実行すると、次の 2 つのクエリで currentMMI と previousMMI が常に true と評価されます。
currentMMI == previousMMI;
currentMMI.equals(previousMMI);
previousMMI = currentMMI
行を変更した場合にのみ、previousMMI = new MMI(currentMMI)
false と評価されます。(受信パラメータを本質的に浅く複製する MMI クラスのコンストラクタを作成しました)。
レデューサーを Hadoop で使用し、メイン メソッドでは使用しない場合、参照を設定する代わりにクローンを作成する必要があるのはなぜですか?
レデューサー クラスをコピーして貼り付けます。これには、Eclipse テスト用のメイン メソッドと、Hadoop で実際に使用するための reduce メソッドの 2 つの部分があります。
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import com.cisco.webex.hadoop.ultrautility.models.MMI;
import com.cisco.webex.hadoop.ultrautility.models.UltraAggregation;
public class MMIReducer extends Reducer<Text, MMI, Object, UltraAggregation> {
public static void main(String[] args) {
ArrayList<MMI> mmis = new ArrayList<MMI>();
mmis.add(new MMI("961864,1,1,1,D1,10,0,2013-08-02 06:00:00.0,USA,N,N"));
mmis.add(new MMI("961865,1,1,1,D1,10,1,2013-08-02 07:00:00.0,USA,N,N"));
mmis.add(new MMI("961866,1,1,1,D1,10,2,2013-08-02 08:00:00.0,USA,N,N"));
mmis.add(new MMI("961867,1,1,1,D1,10,3,2013-08-02 09:00:00.0,USA,N,N"));
mmis.add(new MMI("961868,1,1,1,D1,10,4,2013-08-02 10:00:00.0,USA,N,N"));
mmis.add(new MMI("961869,1,1,1,D1,10,5,2013-08-02 11:00:00.0,USA,N,N"));
mmis.add(new MMI("961870,1,1,1,D1,10,6,2013-08-02 12:00:00.0,USA,N,N"));
mmis.add(new MMI("961871,1,1,1,D1,10,7,2013-08-02 13:00:00.0,USA,N,N"));
mmis.add(new MMI("961872,1,1,1,D1,10,8,2013-08-02 14:00:00.0,USA,N,N"));
mmis.add(new MMI("961873,1,1,1,D1,10,9,2013-08-02 15:00:00.0,USA,N,N"));
Iterator<MMI> values = mmis.iterator();
MMI currentMMI = null;
MMI previousMMI = null;
UltraAggregation currentAggregation = null;
while (values.hasNext()) {
currentMMI = values.next();
if (currentAggregation == null) {
currentAggregation = new UltraAggregation(currentMMI);
}
if (previousMMI == null) {
//previousMMI = new MMI(currentMMI);
previousMMI = currentMMI;
continue;
}
System.out.println();
System.out.println("currentMMI = " + currentMMI);
System.out.println("previousMMI = " + previousMMI);
System.out.println("equals? " + currentMMI.equals(previousMMI));
System.out.println("==? " + (currentMMI == previousMMI));
System.out.println();
// Business logic goes here and involves a context.write on certain conditions
//previousMMI = new MMI(currentMMI);
/*
* THIS DOESNT CAUSE LOGIC ERRORS IN MAIN METHOD
*/
previousMMI = currentMMI;
}
//context.write(null, currentAggregation);
}
@Override
public void reduce(Text key, Iterable<MMI> vals, Context context) throws IOException, InterruptedException {
Iterator<MMI> values = vals.iterator();
//key = deviceId
MMI currentMMI = null;
MMI previousMMI = null;
UltraAggregation currentAggregation = null;
while (values.hasNext()) {
currentMMI = values.next();
if (currentAggregation == null) {
currentAggregation = new UltraAggregation(currentMMI);
}
if (previousMMI == null) {
System.out.println("PreviousMMI is null, setting previousMMI to current MMI and continuing");
//previousMMI = new MMI(currentMMI);
previousMMI = currentMMI;
continue;
}
System.out.println();
System.out.println("currentMMI = " + currentMMI);
System.out.println("previousMMI = " + previousMMI);
System.out.println("equals? " + currentMMI.equals(previousMMI));
System.out.println("==? " + (currentMMI == previousMMI));
System.out.println();
// Business logic goes here and involves a context.write on certain conditions
//previousMMI = new MMI(currentMMI); //Acts as intended
/*
* THIS CAUSES ERRORS WHEN EXECUTED THROUGH HADOOP
*/
previousMMI = currentMMI; // Causes errors
}
context.write(null, currentAggregation);
}
}
これは、静的な値を使用して eclipse でメイン メソッドを実行したときの stdout からの切り捨てられた結果です。
currentMMI = Device Id|D1;Entitlement Tag|10;Device Time|Fri Aug 02 07:00:00 PDT 2013;Uptime|1.0
previousMMI = Device Id|D1;Entitlement Tag|10;Device Time|Fri Aug 02 06:00:00 PDT 2013;Uptime|0.0
equals? false
==? false
currentMMI = Device Id|D1;Entitlement Tag|10;Device Time|Fri Aug 02 08:00:00 PDT 2013;Uptime|2.0
previousMMI = Device Id|D1;Entitlement Tag|10;Device Time|Fri Aug 02 07:00:00 PDT 2013;Uptime|1.0
equals? false
==? false
Hadoop jar を実行したときの切り捨てられた結果を次に示します。
currentMMI = Device Id|D1;Entitlement Tag|10;Device Time|Sun Aug 04 06:00:00 PDT 2013;Uptime|0.0
previousMMI = Device Id|D1;Entitlement Tag|10;Device Time|Sun Aug 04 06:00:00 PDT 2013;Uptime|0.0
equals? true
==? true
currentMMI = Device Id|D1;Entitlement Tag|10;Device Time|Sun Aug 04 07:00:00 PDT 2013;Uptime|1.0
previousMMI = Device Id|D1;Entitlement Tag|10;Device Time|Sun Aug 04 07:00:00 PDT 2013;Uptime|1.0
equals? true
==? true
Eclipse ではなく Hadoop 用に複製する必要があるのはなぜですか?