8

配列のプール(マップに格納されている参照)を更​​新および処理するための単一のライタースレッドと単一のリーダースレッドがあります。書き込みと読み取りの比率はほぼ5:1です(書き込みの待ち時間が問題になります)。

ライタースレッドは、いくつかのイベントに基づいて、プール内の配列のいくつかの要素を更新する必要があります。書き込み操作全体(すべての要素)はアトミックである必要があります。

ライタースレッドが以前に更新された配列を更新している場合は、リーダースレッドがそれを確実に読み取るようにします(揮発性のようなものですが、個々のフィールドではなく配列全体で)。基本的に、古い値を読み取る余裕はありますが、ブロックすることはできません。

また、書き込みは非常に頻繁であるため、読み取り/書き込み中に新しいオブジェクトを作成したり、配列全体をロックしたりするのは非常にコストがかかります。

より安価なロックを使用または使用できる、より効率的なデータ構造はありますか?

4

8 に答える 8

2

より効率的なデータ構造はありますか?

そのとおり!それらは永続データ構造と呼ばれます。以前のバージョンとの違いを保存するだけで、ベクター/マップなどの新しいバージョンを表すことができます。すべてのバージョンは不変であるため、同時実行に適しています(ライターはリーダーに干渉/ブロックせず、その逆も同様です)。

変更を表現するために、永続データ構造への参照をなどの参照型に格納し、AtomicReferenceそれらの参照が指すものを変更します-構造自体ではありません

Clojureは、永続データ構造の最高の実装を提供します。それらは純粋で効率的なJavaで書かれています。

次のプログラムは、永続データ構造を使用して、説明した問題にどのようにアプローチするかを示しています。

import clojure.lang.IPersistentVector;
import clojure.lang.PersistentVector;

public class AtomicArrayUpdates {

    public static Map<Integer, AtomicReference<IPersistentVector>> pool
        = new HashMap<>();
    public static Random rnd = new Random();
    public static final int SIZE = 60000;
    // For simulating the reads/writes ratio
    public static final int SLEEP_TIMÉ = 5;

    static {        
        for (int i = 0; i < SIZE; i++) {
            pool.put(i, new AtomicReference(PersistentVector.EMPTY));
        }
    }

    public static class Writer implements Runnable {   
        @Override public void run() {
            while (true) {
                try {
                    Thread.sleep(SLEEP_TIMÉ);
                } catch (InterruptedException e) {}

                int index = rnd.nextInt(SIZE);
                IPersistentVector vec = pool.get(index).get();

                // note how we repeatedly assign vec to a new value
                // cons() means "append a value".
                vec = vec.cons(rnd.nextInt(SIZE + 1)); 
                // assocN(): "update" at index 0
                vec = vec.assocN(0, 42); 
                // appended values are nonsense, just an example!
                vec = vec.cons(rnd.nextInt(SIZE + 1)); 

                pool.get(index).set(vec);

            }
        }
    }

    public static class Reader implements Runnable {
        @Override public void run() {
            while (true) {
                try {
                    Thread.sleep(SLEEP_TIMÉ * 5);
                } catch (InterruptedException e) {}

                IPersistentVector vec = pool.get(rnd.nextInt(SIZE)).get();
                // Now you can do whatever you want with vec.
                // nothing can mutate it, and reading it doesn't block writers!
            }
        } 
    }

    public static void main(String[] args) {
        new Thread(new Writer()).start();
        new Thread(new Reader()).start();
    }
}
于 2013-03-16T01:52:49.650 に答える
2

このアイデアはどうですか: ライター スレッドは配列を変更しません。更新をキューに入れるだけです。

リーダー スレッドは、アレイの安定したスナップショットを必要とする読み取りセッションに入るたびに、キューに入れられた更新をアレイに適用してから、アレイを読み取ります。

class Update
{
    int position;
    Object value;
}

ArrayBlockingQueue<Update> updates = new ArrayBlockingQueue<>(Integer.MAX_VALUE);

void write()
{
    updates.put(new Update(...));
}

Object[] read()
{
    Update update;
    while((update=updates.poll())!=null)
        array[update.position] = update.value;

    return array;
}
于 2013-03-16T00:18:11.877 に答える
2

配列に 20 個の double しか含まれていないことを考えると、別のアイデアです。

書き込み用と読み取り用の 2 つの配列を用意します。

リーダーは、読み取り中に読み取り配列をロックします。

read()
    lock();
    read stuff
    unlock();

Writer は最初に書き込み配列を変更し、次に読み取り配列をtryLockします。ロックに失敗した場合は問題なく、write() が戻ります。ロックが成功した場合は、書き込み配列を読み取り配列にコピーしてから、ロックを解放します。

write()
    update write array
    if tryLock()
        copy write array to read array
        unlock()

リーダーはブロックできますが、20 個の double をコピーするのにかかる時間だけであり、短いです。

do{}while(tryLock()==false);リーダーは、一時停止を避けるために、スピン ロックを使用する必要があります。

于 2013-03-16T01:05:06.723 に答える
1

私は次のようにします:

  • 全体を同期させて、パフォーマンスが十分かどうかを確認します。書き込みスレッドと読み取りスレッドが 1 つしかないことを考えると、競合は少なく、これで十分に機能する可能性があります。

    private final Map<Key, double[]> map = new HashMap<> ();
    
    public synchronized void write(Key key, double value, int index) {
        double[] array = map.get(key);
        array[index] = value;
    }
    
    public synchronized double[] read(Key key) {
        return map.get(key);
    }
    
  • 遅すぎる場合は、ライターに配列のコピーを作成してもらい、いくつかの値を変更して、新しい配列をマップに戻します。配列のコピーは非常に高速であることに注意してください。通常、20 項目の配列は 100 ナノ秒未満である可能性が高くなります。

    //If all the keys and arrays are constructed before the writer/reader threads 
    //start, no need for a ConcurrentMap - otherwise use a ConcurrentMap
    private final Map<Key, AtomicReference<double[]>> map = new HashMap<> ();
    
    public void write(Key key, double value, int index) {
        AtomicReference<double[]> ref = map.get(key);
        double[] oldArray = ref.get();
        double[] newArray = oldArray.clone();
        newArray[index] = value;
        //you might want to check the return value to see if it worked
        //or you might just skip the update if another writes was performed
        //in the meantime
        ref.compareAndSet(oldArray, newArray);
    }
    
    public double[] read(Key key) {
        return map.get(key).get(); //check for null
    }
    

書き込みは非常に頻繁に行われるため、読み取り/書き込み中に新しいオブジェクトを作成したり、配列全体をロックしたりすると、非常にコストがかかります。

どのくらいの頻度ですか?ミリ秒ごとに何百ものものがない限り、問題ないはずです。

また、次の点にも注意してください。

  • オブジェクトの作成は、Java ではかなり安価です (約 10 CPU サイクル = 数ナノ秒と考えてください)。
  • 存続期間の短いオブジェクトのガベージ コレクションは一般に無料です (オブジェクトが若い世代にある限り、到達できない場合は GC によってアクセスされません)。
  • 一方、存続期間の長いオブジェクトは古い世代にコピーする必要があるため、GC のパフォーマンスに影響を与えます。
于 2013-03-16T11:56:57.087 に答える
0
  • 書き込みがいつ変更されたかを追跡するための単純なミューテックスと、readArray2つの静的参照が必要です。writeArray

  • changeWriteArray というロックされた関数を使用して、writeArray の deepCopy に変更を加えます。

    Synchronized String[] changeWriteArray(String[] writeArrayCopy, other params go here){ // ここで writeArray の deepCopy を変更します

          //then return deepCopy
          return writeArrayCopy;
    }
    
  • はどちらでもないchangeWriteArrayコピーを返すため、実質的に副作用のない関数型プログラミングであることに注意してください。readArraywriteArray

  • 呼び出す人は誰でもchangeWriteArrayそれを として呼び出す必要がありますwriteArray = changeWriteArray(writeArray.deepCopy())

  • ミューテックスは と の両方によって変更されchangeWriteArrayますupdateReadArrayが、 によってのみチェックされupdateReadArrayます。ミューテックスが設定されている場合、実際のブロックへのupdateReadArray参照を指すだけですreadArraywriteArray

編集:

あなたが言及した答えに関する@vemv。考え方は同じですが、大きな違いがあります。2 つの静的参照はstatic、実際に変更を にコピーする時間を費やさないようにするためのものreadArrayです。のポインターがreadArrayを指すように移動されwriteArrayます。事実上changeWriteArray、必要に応じて生成される tmp 配列を使用して交換しています。また、ここでのロックは最小限です。いつでも複数のリーダーを使用できるという意味で、読み取りにはロックが必要ないからです。

実際、このアプローチでは、同時読み取りの数を保持し、いつ更新するかについてカウンターがゼロであることを確認できreadArrayますwriteArray。繰り返しますが、その読み取りにはロックはまったく必要ありません。

于 2013-03-16T05:58:41.813 に答える
0

@ zhong.j.yuの回答を改善すると、書き込みが発生したときに実行しようとするのではなく、書き込みをキューに入れることをお勧めします。ただし、更新が非常に高速に行われるため、継続的に更新が行われるとリーダーが窒息する場合、この問題に取り組む必要があります。次の読み取りで対処します)。

独自の同期キューを作成する必要があります。これはリンクされたリストに基づいており、次の 2 つのメソッドのみが含まれます。

public synchronised enqeue(Write write);

このメソッドは、アトミックに書き込みをキューに入れます。書き込みが実際にキューに入るよりも速くなると、デッドロックが発生する可能性がありますが、それを達成するには毎秒数十万回の書き込みが必要になると思います。

public synchronised Element cut();

これにより、キューがアトミックに空になり、その先頭 (または末尾) が Element オブジェクトとして返されます。これには、他の要素 (Element.next など、通常のリンク リストのもの) のチェーンが含まれます。これらはすべて、最後の読み取り以降の一連の書き込みを表します。その後、キューは空になり、新しい書き込みを受け入れる準備が整います。その後、リーダーは要素チェーンをトレースし (それまではスタンドアロンであり、後続の書き込みには影響されません)、書き込みを実行し、最後に読み取りを実行できます。リーダーが読み取りを処理している間、新しい書き込みがキューに入れられますが、それらは次の読み取りの問題になります。

C++ ではありますが、サウンド データ バッファーを表すために、これを 1 回書きました。書き込み (ドライバーはより多くのデータを送信します) は、読み取り (データに対する数学的なもの) よりも多くありましたが、書き込みはできるだけ早く終了する必要がありました。(データはリアルタイムで取得されたので、次のバッチがドライバーで準備される前に保存する必要がありました。)

于 2013-03-17T23:50:45.493 に答える
0

次のバリエーションは、私の以前の回答zhong.j.yu の 1 つの両方に触発されています。

ライターはリーダーに干渉/ブロックせず、その逆もありません。また、スレッドの安全性/可視性の問題や、微妙な推論が行われることもありません。

public class V2 {

    static Map<Integer, AtomicReference<Double[]>> commited = new HashMap<>();
    static Random rnd = new Random();

    static class Writer {
        private Map<Integer, Double[]> writeable = new HashMap<>();
        void write() {        
            int i = rnd.nextInt(writeable.size());   
            // manipulate writeable.get(i)...
            commited.get(i).set(writeable.get(i).clone());
        }
    }

    static class Reader{
        void read() {
            double[] arr = commited.get(rnd.nextInt(commited.size())).get();
            // do something useful with arr...
        } 
    }

}
于 2013-03-16T03:10:19.800 に答える