11

タスクが完了するまでそれ以上のリクエストを破棄しながら、タスクを非同期で実行する必要があります。

メソッドを同期すると、タスクがキューに入れられるだけで、スキップされません。最初は SingleThreadExecutor を使用することを考えていましたが、それもタスクをキューに入れます。次に ThreadPoolExecutor を調べましたが、キューを読み取って実行するタスクを取得するため、1 つのタスクが実行され、少なくとも 1 つのタスクがキューに入れられます (その他は ThreadPoolExecutor.DiscardPolicy を使用して破棄できます)。

私が考えることができる唯一のことは、セマフォを使用してキューをブロックすることです。私が達成しようとしていることを示すために、次の例を用意しました。もっと簡単な方法はありますか?明らかな何かを見逃しましたか?

import java.util.concurrent.*;

public class ThreadPoolTester {
    private static ExecutorService executor = Executors.newSingleThreadExecutor();
    private static Semaphore processEntry = new Semaphore(1);

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 20; i++) {
            kickOffEntry(i);

            Thread.sleep(200);
        }

        executor.shutdown();
    }

    private static void kickOffEntry(final int index) {
        if (!processEntry.tryAcquire()) return;
        executor.
            submit(
                new Callable<Void>() {
                    public Void call() throws InterruptedException {
                        try {
                            System.out.println("start " + index);
                            Thread.sleep(1000); // pretend to do work
                            System.out.println("stop " + index);
                            return null;

                        } finally {
                            processEntry.release();
                        }
                    }
                }
            );
    }
}

サンプル出力

start 0
stop 0
start 5
stop 5
start 10
stop 10
start 15
stop 15

axtavt の回答を取得し、上記の例を変換すると、次のより簡単なソリューションが得られます。

import java.util.concurrent.*;

public class SyncQueueTester {
    private static ExecutorService executor = new ThreadPoolExecutor(1, 1, 
            1000, TimeUnit.SECONDS, 
            new SynchronousQueue<Runnable>(),
            new ThreadPoolExecutor.DiscardPolicy());

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 20; i++) {
            kickOffEntry(i);

            Thread.sleep(200);
        }

        executor.shutdown();
    }

    private static void kickOffEntry(final int index) {
        executor.
            submit(
                new Callable<Void>() {
                    public Void call() throws InterruptedException {
                        System.out.println("start " + index);
                        Thread.sleep(1000); // pretend to do work
                        System.out.println("stop " + index);
                        return null;
                    }
                }
            );
    }
}
4

2 に答える 2

13

SynchronousQueue目的のポリシーに裏打ちされたエグゼキューターがあなたが望むことをしているように見えます:

executor = new ThreadPoolExecutor(
    1, 1, 
    1000, TimeUnit.SECONDS, 
    new SynchronousQueue<Runnable>(),
    new ThreadPoolExecutor.DiscardPolicy());
于 2011-02-10T08:41:25.357 に答える
0

キューがない場合、エグゼキューターは必要ありません。セマフォを単独で使用するだけで十分なようです。以下のコードを使用して、既に実行されているときに同じコードを実行しないようにしています。semaphoreisであることを確認してくださいstatic volatile。これにより、セマフォがクラスの唯一のセマフォになり、変更されるとすぐにセマフォ参照が他のスレッドのヒープに伝播されます

if (this.getSemaphore().tryAcquire()) {
        try {
            process();
        } catch (Exception e) {
        } finally {
            this.getSemaphore().release();
        }
}
else {
    logger.info(">>>>> Job already running, skipping go");
}
于 2011-02-10T08:45:12.767 に答える