次のことを行うラッチ実装があるかどうかは誰にもわかりますか?
- ラッチ値をデクリメントする方法、または値がゼロの場合は待機する方法があります
- ラッチ値がゼロになるのを待つ方法があります
- ラッチの値に数値を追加する方法があります
次のことを行うラッチ実装があるかどうかは誰にもわかりますか?
Phaser (java.util.concurrent.Phaser)を使用することもできます。
final Phaser phaser = new Phaser(1); // register self
while (/* some condition */) {
phaser.register(); // Equivalent to countUp
// do some work asynchronously, invoking
// phaser.arriveAndDeregister() (equiv to countDown) in a finally block
}
phaser.arriveAndAwaitAdvance(); // await any async tasks to complete
これが役立つことを願っています。
java.util.concurrent.Semaphoreは法案に合っているようです。
(*) セマフォが利用できなくなるまで待機する組み込みメソッドはありません。最初に実行する独自のラッパーを作成acquire
し、tryAcquire
それが失敗した場合は「ビジーイベント」をトリガーします(通常のを使用し続けますacquire
)。誰もがあなたのラッパーを呼び出す必要があります。多分セマフォのサブクラス?
AQS から始める代わりに、以下のような単純な実装を使用できます。これはややナイーブですが (AQS ロックフリー アルゴリズムに対して同期化されています)、満足のいくシナリオで使用することを期待しない限り、それで十分です。
public class CountUpAndDownLatch {
private CountDownLatch latch;
private final Object lock = new Object();
public CountUpAndDownLatch(int count) {
this.latch = new CountDownLatch(count);
}
public void countDownOrWaitIfZero() throws InterruptedException {
synchronized(lock) {
while(latch.getCount() == 0) {
lock.wait();
}
latch.countDown();
lock.notifyAll();
}
}
public void waitUntilZero() throws InterruptedException {
synchronized(lock) {
while(latch.getCount() != 0) {
lock.wait();
}
}
}
public void countUp() { //should probably check for Integer.MAX_VALUE
synchronized(lock) {
latch = new CountDownLatch((int) latch.getCount() + 1);
lock.notifyAll();
}
}
public int getCount() {
synchronized(lock) {
return (int) latch.getCount();
}
}
}
注: 詳細なテストは行っていませんが、期待どおりに動作するようです:
public static void main(String[] args) throws InterruptedException {
final CountUpAndDownLatch latch = new CountUpAndDownLatch(1);
Runnable up = new Runnable() {
@Override
public void run() {
try {
System.out.println("IN UP " + latch.getCount());
latch.countUp();
System.out.println("UP " + latch.getCount());
} catch (InterruptedException ex) {
}
}
};
Runnable downOrWait = new Runnable() {
@Override
public void run() {
try {
System.out.println("IN DOWN " + latch.getCount());
latch.countDownOrWaitIfZero();
System.out.println("DOWN " + latch.getCount());
} catch (InterruptedException ex) {
}
}
};
Runnable waitFor0 = new Runnable() {
@Override
public void run() {
try {
System.out.println("WAIT FOR ZERO " + latch.getCount());
latch.waitUntilZero();
System.out.println("ZERO " + latch.getCount());
} catch (InterruptedException ex) {
}
}
};
new Thread(waitFor0).start();
up.run();
downOrWait.run();
Thread.sleep(100);
downOrWait.run();
new Thread(up).start();
downOrWait.run();
}
出力:
IN UP 1
UP 2
WAIT FOR ZERO 1
IN DOWN 2
DOWN 1
IN DOWN 1
ZERO 0
DOWN 0
IN DOWN 0
IN UP 0
DOWN 0
UP 0
これは のバリエーションでCounterLatch
、Apache サイトから入手できます。
それらのバージョンは、変数 ( ) が特定の値にある 間、呼び出し元のスレッドをブロックします。AtomicInteger
しかし、このコードを簡単に微調整して、Apache バージョンが何を行うかを選択できるようにするか、「カウンターが特定の値に達するまでここで待機する」ように指定できます。後者の方が適用範囲が広いと考えられます。私の特定のケースでは、すべての「チャンク」が公開されていることを確認したかったので、これをざわめきましたSwingWorker.process()
...しかし、それ以来、他の用途を見つけました。
ここでは、公式には世界最高の言語 (TM) である Jython で記述されています。やがてJavaバージョンをざわめくつもりです。
class CounterLatch():
def __init__( self, initial = 0, wait_value = 0, lift_on_reached = True ):
self.count = java.util.concurrent.atomic.AtomicLong( initial )
self.signal = java.util.concurrent.atomic.AtomicLong( wait_value )
class Sync( java.util.concurrent.locks.AbstractQueuedSynchronizer ):
def tryAcquireShared( sync_self, arg ):
if lift_on_reached:
return -1 if (( not self.released.get() ) and self.count.get() != self.signal.get() ) else 1
else:
return -1 if (( not self.released.get() ) and self.count.get() == self.signal.get() ) else 1
def tryReleaseShared( self, args ):
return True
self.sync = Sync()
self.released = java.util.concurrent.atomic.AtomicBoolean() # initialised at False
def await( self, *args ):
if args:
assert len( args ) == 2
assert type( args[ 0 ] ) is int
timeout = args[ 0 ]
assert type( args[ 1 ] ) is java.util.concurrent.TimeUnit
unit = args[ 1 ]
return self.sync.tryAcquireSharedNanos(1, unit.toNanos(timeout))
else:
self.sync.acquireSharedInterruptibly( 1 )
def count_relative( self, n ):
previous = self.count.addAndGet( n )
if previous == self.signal.get():
self.sync.releaseShared( 0 )
return previous
NB Apache バージョンでは、 andのキーワードvolatile
を使用します。Jython では、これ自体は存在しないと思いますが、andを使用すると、どのスレッドでも値が「古くなっている」ことがないようにする必要があります。signal
released
AtomicInteger
AtomicBoolean
使用例:
SwingWorker コンストラクターで:
self.publication_counter_latch = CounterLatch()
SW.publish で:
# increase counter value BEFORE publishing chunks
self.publication_counter_latch.count_relative( len( chunks ) )
self.super__publish( chunks )
SW.プロセス:
# ... do sthg [HERE] with the chunks!
# AFTER having done what you want to do with your chunks:
self.publication_counter_latch.count_relative( - len( chunks ) )
チャンク処理が停止するのを待っているスレッドで:
worker.publication_counter_latch.await()
私はそれが必要で、AQS (ノンブロッキング) を使用する CountDownLatch と同じ戦略を使用して構築しました。このクラスは、Apache Camel 用に作成されたものと (正確ではないにしても) 非常に似ています。JDK Phaser よりも軽いと思います。 JDK の CountDownLact と同じように動作し、ゼロ未満のカウントダウンは許可されず、カウントダウンとカウントアップが許可されます。
import java.util.concurrent.TimeUnit; java.util.concurrent.locks.AbstractQueuedSynchronizer をインポートします。
public class CountingLatch
{
/**
* Synchronization control for CountingLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer
{
private Sync()
{
}
private Sync(final int initialState)
{
setState(initialState);
}
int getCount()
{
return getState();
}
protected int tryAcquireShared(final int acquires)
{
return getState()==0 ? 1 : -1;
}
protected boolean tryReleaseShared(final int delta)
{
// Decrement count; signal when transition to zero
for(; ; ){
final int c=getState();
final int nextc=c+delta;
if(nextc<0){
return false;
}
if(compareAndSetState(c,nextc)){
return nextc==0;
}
}
}
}
private final Sync sync;
public CountingLatch()
{
sync=new Sync();
}
public CountingLatch(final int initialCount)
{
sync=new Sync(initialCount);
}
public void increment()
{
sync.releaseShared(1);
}
public int getCount()
{
return sync.getCount();
}
public void decrement()
{
sync.releaseShared(-1);
}
public void await() throws InterruptedException
{
sync.acquireSharedInterruptibly(1);
}
public boolean await(final long timeout) throws InterruptedException
{
return sync.tryAcquireSharedNanos(1,TimeUnit.MILLISECONDS.toNanos(timeout));
}
}