6

これが私のシナリオです:

複数の子アクターからメッセージを受信するマスターアクターがいます。これらのメッセージには、集約されるデータが含まれています。この集計ロジックでは、共有データ構造を使用して集計を収集する場合、同期の問題に対処する必要がありますか?

else if(arg0 instanceof ReducedMsg){

                           ReducedMsg reduced = (ReducedMsg)arg0;
        counter.decrementAndGet();

        synchronized(finalResult){

            finalResult.add((KeyValue<K, V>) reduced.getReduced());

            if(counter.get() == 0){
                                    if(checkAndReduce(finalResult)){

                    finalResult.clear();
                }
                else{
                    stop();
                    latch.countDown();
                }

            }

        }



    }

ご覧のとおり、finalResultがあり、各メッセージが集約されます。処理ロジックの後で、コレクションもクリアする必要があります。

実際、私が実装しようとしているのは、再帰的(連想的)なリダクションmapreduceです。だから私は私が想定している同期ブロックを維持する必要がありますか?それとも、Akkaが一度に1つのスレッドでonReceiveを実行する可能性がありますか?

このロジックは、小さなデータセットで正確で予測可能な結果を​​生成します。私の問題は、入力データセットが少し大きい場合、コードがハングすることです。これは、同期ブロックのコンテキストスイッチが原因であることを確認したいので、別の設計に踏み込むことができます。

4

1 に答える 1

19

onReceive()同時に呼び出されることはありません。これは、Akkaが提供する最も基本的な保証です。

これは、counter変数がアクターのフィールドであり、他のコードがそのフィールドに直接アクセスできない場合、/の代わりに通常の/を安全に使用できることを意味しintます。また、アクターにカプセル化されて隠されているフィールドである場合は、同期をオンにする必要はありません。longAtomicIntegerAtomicLongfinalResult

最後に、の使用法CountDownLatchは疑わしいです。Akkaアプリケーションでは、同期プリミティブを使用しないでください。アクターは基本的にシングルスレッドであり、すべての通信(データのウェイクアップと受け渡しを含む)はメッセージパッシングを介して実装する必要があります。

これはすべてドキュメントで説明されています:http://doc.akka.io/docs/akka/2.0.2/general/jmm.html#Actors_and_the_Java_Memory_Model

于 2012-07-26T16:04:41.967 に答える