2

アキュムレータを使用していますが、これらのオブジェクトがスレッドセーフかどうかを知りたいですか?

accumIntのタイプですAccumulatorParam<Integer>

// Current value accumInt -> 6
AccumulatorThread t1 = new AccumulatorThread();
t1.setAccum(accumInt); 
t1.setValueToAdd(5);

AccumulatorThread t2 = new AccumulatorThread();
t2.setAccum(accumInt);
t2.setValueToAdd(7);

new Thread(t1).start();
new Thread(t2).start();

System.out.println(accumInt.value()); // 11 or 13 or 18

AccumlatorThreadクラス:

class AccumulatorThread implements Runnable {
    Accumulator<Integer> accum;
    Integer              valueToAdd;

    public Integer getValueToAdd() {
        return valueToAdd;
    }


    public void setValueToAdd(Integer valueToAdd) {
        this.valueToAdd = valueToAdd;
    }

    public Accumulator<Integer> getAccum() {
        return accum;
    }


    public void setAccum(Accumulator<Integer> accum) {
        this.accum = accum;
    }

    public void run() {
        System.out.println("Value to Add in Thread : "+valueToAdd);
        accum.add(valueToAdd);
    }
}

この動作は、スレッド セーフではないことを示しています。何か不足していますか?

4

4 に答える 4

5

アキュムレータはスレッドセーフではありません。マルチスレッドでのみSparkContext使用できます。

于 2014-12-12T07:27:23.207 に答える
5

OOC 同じプログラムでアキュムレータの設定と読み取りの両方を行っているのはなぜですか? アキュムレータは通常、ワーカー スレッドによって追加され、ドライバー スレッドによってのみ読み取られます。

Worker1:   accumulator.add(increment)
Worker2:   accumulator.add(someOtherIncrement)

Driver:  println(accumulator.value)

ここで、ドライバーの異なるスレッドで値を設定/読み取るためのマルチスレッドについて質問しています。何の目的で?その場合は、ローカル JVMAtomicIntegerまたはAtomicLong.

アキュムレータは、連想操作によってのみ「追加」される変数であるため、並行して効率的にサポートできます。

于 2014-12-14T02:02:23.683 に答える