私は、オファー/ポーリングごとに同期や CAS 操作を行わずに、1 つのコンシューマーとプロデューサーが同時にキューから要素をオファー/ポーリングできるようにするキューを開発しました。代わりに、キューの末尾セクションが空の場合、1 つのアトミック操作のみが必要です。このキューは、キューがバッファリングされ、コンシューマーがプロデューサーに追いつかない場合に、レイテンシーを短縮することを目的としています。
質問では、実装をレビューしたいと思います(コードはまだ他の誰にもレビューされていないので、セカンドオピニオンを取得するのは素晴らしいことです)、レイテンシを大幅に削減すると思われる使用パターンと、このアーキテクチャができるかどうかについて議論したいと思いますおそらく LMAX ディスラプターよりも高速に実行されます。
コードは github にあります: https://github.com/aranhakki/experimental-performance/blob/master/java/src/concurrency/messaging/ConcurrentPollOfferArrayQueue.java
/*
* Copyright 2014 Aran Hakki
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package concurrency.messaging;
// A non-blocking queue which allows concurrent offer and poll operations with minimal contention.
// Contention in offer and poll operations only occurs when offerArray, which acts as an incomming message buffer,
// becomes full, and we must wait for it too be swapped with the pollArray, acting as a outgoing message buffer,
// the most simple analogy would be too imaging two buckets, one we fill and at the same time we empty another bucket
// which already contains some liquid, then at the point the initial bucket becomes full, we swap it with the bucket that
// is being emptied.
// It's possible that this mechanism might be faster than the LMAX disruptor, need to create tests to confirm.
public final class ConcurrentPollOfferArrayQueue<T> {
private T[] offerArray;
private T[] pollArray;
public ConcurrentPollOfferArrayQueue(T[] _pollArray){
offerArray = (T[]) new Object[_pollArray.length];
pollArray = _pollArray;
}
private int offerIndex = 0;
private int pollIndex = 0;
public void offer(T t){
if (offerIndex<offerArray.length){
offerArray[offerIndex] = t;
offerIndex++;
} else {
while(!arrayFlipped){
}
arrayFlipped = false;
offerIndex = 0;
offer(t);
}
}
private volatile boolean arrayFlipped = false;
public T poll(){
if (pollIndex<pollArray.length){
T t = pollArray[pollIndex];
pollArray[pollIndex] = null;
pollIndex++;
return t;
} else {
pollIndex = 0;
T[] pollArrayTmp = pollArray;
pollArray = offerArray;
offerArray = pollArrayTmp;
arrayFlipped = true;
return poll();
}
}
}
複数のプロデューサーとコンシューマーがすべて同じキューを参照する代わりに、これらのキューの多くを使用することで、待ち時間を大幅に短縮できると思います。
プロデューサー A、B、C がすべて単一のキュー Q を参照し、コンシューマー E、E、および F がすべて同じキューを参照しているとします。これにより、次の一連の関係が発生し、多くの競合が発生します。
A は Q に書き込みます
B は Q に書き込みます
C は Q に書き込みます
E は Q に書き込みます
D は Q に書き込みます
F は Q に書き込みます
私が開発したキューを使用すると、各プロデューサーと単一のコンシューマー集約スレッドの間にキューを作成できます。このスレッドは、各プロデューサー キューの末尾の要素を取得し、それらをコンシューマー キューの先頭に配置します。これにより、メモリのセクションへのライターが 1 つしかないため、競合が大幅に減少します。リレーションシップは次のようになります。
A writesTo headOf(AQ)
B writesTo headOf(BQ)
C writesTo headOf(CQ)
ConsumerAggregationThread writesTo tailOf(AQ)
ConsumerAggregationThread writesTo tailOf(BQ)
ConsumerAggregationThread writesTo tailOf(CQ)
ConsumerAggregationThread writesTo headOf(EQ)
ConsumerAggregationThread writesTo headOf(FQ)
ConsumerAggregationThread writesTo headOf(GQ)
E writesTo tailOf(EQ)
F writesTo tailOf(FQ)
G writesTo tailOf(GQ)
上記の関係により、単一ライターの原則が保証されます。
私はあなたの考えを聞きたいです。