4

Java のConcurrentSkipListMapに基づいて同時スキップ リスト マップを実装しています。違いは、リストに重複を許可することと、リストをインデックス可能にすることです(リストの N 番目の要素を見つけるには O(lg( n)) 時間、標準のスキップ リストのような O(n) 時間ではなく)。これらの変更は問題を引き起こしていません。

さらに、スキップ リストのキーは可変です。たとえば、リスト要素が整数 {0, 4, 7} の場合、リスト構造の変更を求めることなく、中間要素のキーを [0, 7] の任意の値に変更できます。キーが (-inf, -1] または [8, +inf) に変更された場合、要素は削除され、リストの順序を維持するために再度追加されます。これを削除とそれに続く O(lg(n)) 挿入として実装するのではなく、これを削除とそれに続く線形トラバーサルとそれに続く O(1) 挿入として実装します (O(1) - 99 の予​​想される実行時間で)ノードが隣接ノードと交換される時間の割合)。

完全に新しいノードを挿入することは (起動後に) まれであり、ノードを削除する (すぐに再追加することなく) ことは決してありません。ほとんどすべての操作は、i 番目のインデックスの要素を取得するための elementAt(i)、またはキーが変更された後にノードを交換するための操作です。

私が直面している問題は、キー変更クラスを実装する方法にあります。概念的には、次のようなことをしたいと思います

public class Node implements Runnable {
    private int key;
    private Node prev, next;
    private BlockingQueue<Integer> queue;

    public void update(int i) {
        queue.offer(i);
    }

    public void run() {
        while(true) {
            int temp = queue.take();
            temp += key;
            if(prev.getKey() > temp) {
                // remove node, update key to temp, perform backward linear traversal, and insert
            } else if(next.getKey() < temp) {
                // remove node, update key to temp, perform forward linear traveral, and insert
            } else {
                key = temp; // node doesn't change position
            }
        }
    }
}

(insertから呼び出されるサブメソッドはrun、同じ場所に同時に挿入しようとする 2 つのノードの問題を処理するために CAS を使用します (ConcurrentSkipListMap競合する挿入を処理する方法と同様) - 概念的には、これは最初のノードがノードをロックした場合と同じですただし、競合がない場合はオーバーヘッドが削減されます)。

このようにして、リストが常に順序どおりであることを確認できます (キーの更新が少し遅れても、最終的に更新が行われることは確実なので問題ありません。ただし、リストが順不同になると、事態は混乱する可能性があります)。問題は、このようにリストを実装すると、非常に多くのスレッドが生成され、1 つにつき 1 つNode(リストに数千のノードがある) になることです。それらのほとんどは、特定の時点でブロックされますが、数千のスレッドが発生することを懸念しています。スレッドをブロックすると、オーバーヘッドが非常に高くなります。

もう 1 つのオプションは、updateメソッドを同期させ、Runnableからインターフェイスを削除するNodeことです。これにより、2 つのスレッドが で更新をエンキューしNode、別のスレッドでこれらの更新を処理するのではなく、2 つのスレッドが交互にNode#updateメソッドを実行します。問題は、これがボトルネックになる可能性があることです。8 つの異なるスレッドがすべて一度に同じノードを更新することを決定した場合、キューの実装は問題なくスケーリングされますが、同期された実装は 8 つのスレッドのうち 7 つをブロックします (その後、6 つのスレッド、次に 5 つのスレッドをブロックするなど)。

したがって、私の質問は、スレッドの数を減らすことを除いて、キューの実装のようなものをどのように実装するか、または潜在的なボトルネックの問題がないことを除いて、同期された実装のようなものをどのように実装するかです。

4

1 に答える 1

0

ThreadPoolExecutorのようなものでこれを解決できるかもしれないと思います

public class Node {
    private int key;
    private Node prev, next;
    private ConcurrentLinkedQueue<Integer> queue;
    private AtomicBoolean lock = new AtomicBoolean(false);
    private ThreadPoolExecutor executor;
    private UpdateNode updater = new UpdateNode();

    public void update(int i) {
        queue.offer(i);
        if(lock.compareAndSet(false, true)) {
            executor.execute(updater);
        }
    }

    private class UpdateNode implements Runnable {
        public void run() {
            do {
                try {
                    int temp = key;
                    while(!queue.isEmpty()) {
                        temp += queue.poll();
                    }
                    if(prev.getKey() > temp) {
                        // remove node, update key to temp, perform backward linear traversal, and insert
                    } else if(next.getKey() < temp) {
                        // remove node, update key to temp, perform forward linear traveral, and insert
                    } else {
                        key = temp; // node doesn't change position
                    }
                } finally {
                    lock.set(false);
                }
            } while (!queue.isEmpty() && lock.compareAndSet(false, true));
        }
    }
}

このようにして、1,000 のスレッドがブロックされたままになることなく、キュー アプローチの利点を得ることができますUpdateNode。に依存して、数千のランナブルを安価に作成できます。UpdateNodeNodeAtomicBooleanThreadPoolExecutor

于 2013-05-14T13:35:08.580 に答える