0

私は、オファー/ポーリングごとに同期や CAS 操作を行わずに、1 つのコンシューマーとプロデューサーが同時にキューから要素をオファー/ポーリングできるようにするキューを開発しました。代わりに、キューの末尾セクションが空の場合、1 つのアトミック操作のみが必要です。このキューは、キューがバッファリングされ、コンシューマーがプロデューサーに追いつかない場合に、レイテンシーを短縮することを目的としています。

質問では、実装をレビューしたいと思います(コードはまだ他の誰にもレビューされていないので、セカンドオピニオンを取得するのは素晴らしいことです)、レイテンシを大幅に削減すると思われる使用パターンと、このアーキテクチャができるかどうかについて議論したいと思いますおそらく LMAX ディスラプターよりも高速に実行されます。

コードは github にあります: https://github.com/aranhakki/experimental-performance/blob/master/java/src/concurrency/messaging/ConcurrentPollOfferArrayQueue.java

/*
 * Copyright 2014 Aran Hakki
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package concurrency.messaging;

// A non-blocking queue which allows concurrent offer and poll operations with minimal contention.
// Contention in offer and poll operations only occurs when offerArray, which acts as an incomming message buffer,
// becomes full, and we must wait for it too be swapped with the pollArray, acting as a outgoing message buffer,
// the most simple analogy would be too imaging two buckets, one we fill and at the same time we empty another bucket
// which already contains some liquid, then at the point the initial bucket becomes full, we swap it with the bucket that
// is being emptied. 

// It's possible that this mechanism might be faster than the LMAX disruptor, need to create tests to confirm.

public final class ConcurrentPollOfferArrayQueue<T> {

    private T[] offerArray;
    private T[] pollArray;

    public ConcurrentPollOfferArrayQueue(T[] _pollArray){
        offerArray = (T[]) new Object[_pollArray.length];
        pollArray = _pollArray;
    }

    private int offerIndex = 0;
    private int pollIndex = 0;

    public void offer(T t){
        if (offerIndex<offerArray.length){
            offerArray[offerIndex] = t;
            offerIndex++;
        } else {
            while(!arrayFlipped){

            }
            arrayFlipped = false;
            offerIndex = 0;
            offer(t);
        }
    }

    private volatile boolean arrayFlipped = false;

    public T poll(){
        if (pollIndex<pollArray.length){
            T t = pollArray[pollIndex];
            pollArray[pollIndex] = null;
            pollIndex++;
            return t;
        } else {
            pollIndex = 0;
            T[] pollArrayTmp = pollArray;
            pollArray = offerArray;
            offerArray = pollArrayTmp;
            arrayFlipped = true;
            return poll();
        }

    }

}

複数のプロデューサーとコンシューマーがすべて同じキューを参照する代わりに、これらのキューの多くを使用することで、待ち時間を大幅に短縮できると思います。

プロデューサー A、B、C がすべて単一のキュー Q を参照し、コンシューマー E、E、および F がすべて同じキューを参照しているとします。これにより、次の一連の関係が発生し、多くの競合が発生します。

A は Q に書き込みます

B は Q に書き込みます

C は Q に書き込みます

E は Q に書き込みます

D は Q に書き込みます

F は Q に書き込みます

私が開発したキューを使用すると、各プロデューサーと単一のコンシューマー集約スレッドの間にキューを作成できます。このスレッドは、各プロデューサー キューの末尾の要素を取得し、それらをコンシューマー キューの先頭に配置します。これにより、メモリのセクションへのライターが 1 つしかないため、競合が大幅に減少します。リレーションシップは次のようになります。

A writesTo headOf(AQ)

B writesTo headOf(BQ)

C writesTo headOf(CQ)

ConsumerAggregationThread writesTo tailOf(AQ)

ConsumerAggregationThread writesTo tailOf(BQ)

ConsumerAggregationThread writesTo tailOf(CQ)

ConsumerAggregationThread writesTo headOf(EQ)

ConsumerAggregationThread writesTo headOf(FQ)

ConsumerAggregationThread writesTo headOf(GQ)

E writesTo tailOf(EQ)

F writesTo tailOf(FQ)

G writesTo tailOf(GQ)

上記の関係により、単一ライターの原則が保証されます。

私はあなたの考えを聞きたいです。

4

1 に答える 1

0

皆さんはこの実装についてどう思いますか? pollQueue が空になると、ポーリング スレッドがキュー スイッチをトリガーするように変更しました。

/*
* Copyright 2014 Aran Hakki
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/



/* 
* A non-blocking queue which allows concurrent offer and poll operations with    minimal contention.
* Contention in offer and poll operations only occurs when pollQueue is empty and must be swapped with offer queue.
* This implementation does not make use of any low level Java memory optimizations e.g. using the Unsafe class or direct byte buffers,
* so its possible it could run much faster.
* If re-engineered to use lower level features its possible that this approach might be faster than the LMAX disruptor.
* I'm current observing an average latency of approx 6000ns.
*/

package concurrency.messaging;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;

public class ConcurrentPollOfferQueue<T> {

private class ThreadSafeSizeQueue<T> {
    private Queue<T> queue = new LinkedList<T>();
    private volatile AtomicInteger size = new AtomicInteger(0);

    public int size(){
        return size.get();
    }

    public void offer(T value){
        queue.offer(value);
        size.incrementAndGet();
    }

    public T poll(){
        T value = queue.poll();
        if (value!=null){
            size.decrementAndGet();
        }
        return value;
    }
}

private volatile ThreadSafeSizeQueue<T> offerQueue;
private volatile ThreadSafeSizeQueue<T> pollQueue;

private int capacity;

public ConcurrentPollOfferQueue(int capacity){
    this.capacity = capacity;
    offerQueue = new ThreadSafeSizeQueue<T>();
    pollQueue = new ThreadSafeSizeQueue<T>();
}

public void offer(T value){
    while(offerQueue.size()==capacity){/* wait for consumer to finishing consuming pollQueue */}
    offerQueue.offer(value);
}

public T poll(){
    T polled;
    while((polled = pollQueue.poll())==null){
        if (pollQueue.size()==0){
            ThreadSafeSizeQueue<T> tmpQueue = offerQueue;
            offerQueue = pollQueue;
            pollQueue = tmpQueue;
        } 
    }
    return polled;
}

public boolean isEmpty(){
    return pollQueue.size()==0;
}
于 2015-03-12T01:08:54.620 に答える