3

初期化に時間がかかるサービスの構成設定 (たとえば、JDBC 接続のパラメーター) を受け入れるユーザー インターフェイスを考えてみましょう。サービスの初期化が行われている間、ユーザー インターフェイスの応答性を維持したいと考えています。ユーザーが追加の変更を行った場合は、初期化をキャンセルし、新しいパラメーターを使用して再開する必要があります。

ユーザーが各文字を入力するとパラメーターが構成に組み込まれるため、多数の初期化要求が連続して作成される可能性があります。最後の 1 つだけを実行する必要があります。

この結果を達成するためにまとめたコードがありますが、この動作は ExecutorService として実装するのに非常に適しているようです。すべてを ExecutorService にリファクタリングする前に、似たような実装が既に世界に出回っていないか尋ねてみようと思いました。

もう少し詳しく言うと:

ExecutorService には 1 つのワーカー スレッドがあります。新しいタスクが送信されるとすぐに、現在のタスクはキャンセルされます (そしてワーカーは中断されます)。その後、次の実行のために新しいタスクがキャプチャされます。別のタスクが投入されると、現在のタスクは再びキャンセルされ、「次の実行」タスクはこの新しいタスクに設定されます。ワーカー スレッドが最終的に次の実行タスクを取得すると、それは常に最後に送信されたタスクになります。他のすべてのタスクはキャンセルまたは破棄されます。

喜んで共有したいような実装を持っている人はいますか? または、このタイプの動作をカバーする標準ライブラリはありますか? 実装するのは難しくありませんが、スレッド セーフを確立するのは難しい場合があるため、できれば実証済みのコードを使用したいと考えています。

4

2 に答える 2

3

これが私が最終的に思いついたものです-コメントに興味があります:

public class InterruptingExecutorService extends ThreadPoolExecutor{
    private volatile FutureTask<?> currentFuture;

    public InterruptingExecutorService(boolean daemon) {
        super(0, 1, 1000L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(), 
                daemon ? new DaemonThreadFactory() : Executors.defaultThreadFactory());

    }

    public static class DaemonThreadFactory implements ThreadFactory{
        ThreadFactory delegate = Executors.defaultThreadFactory();

        @Override
        public Thread newThread(Runnable r) {
            Thread t = delegate.newThread(r);
            t.setDaemon(true);
            return t;
        }

    }

    private void cancelCurrentFuture(){
        // cancel all pending tasks
        Iterator<Runnable> it = getQueue().iterator();
        while(it.hasNext()){
            FutureTask<?> task = (FutureTask<?>)it.next();
            task.cancel(true);
            it.remove();
        }

        // cancel the current task
        FutureTask<?> currentFuture = this.currentFuture;
        if(currentFuture != null){
            currentFuture.cancel(true);
        }
    }

    @Override
    public void execute(Runnable command) {
        if (command == null) throw new NullPointerException();

        cancelCurrentFuture();
        if (!(command instanceof FutureTask)){ // we have to be able to cancel a task, so we have to wrap any non Future
            command = newTaskFor(command, null);
        }
        super.execute(command);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        // it is safe to access currentFuture like this b/c we have limited the # of worker threads to only 1
        // it isn't possible for currentFuture to be set by any other thread than the one calling this method
        this.currentFuture = (FutureTask<?>)r;
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        // it is safe to access currentFuture like this b/c we have limited the # of worker threads to only 1
        // it isn't possible for currentFuture to be set by any other thread than the one calling this method
        this.currentFuture = null;
    }
}
于 2012-10-21T15:41:58.683 に答える
1

DiscardOldestPolicyをエグゼキュータに追加する必要がある場合があります

http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/ThreadPoolExecutor.DiscardOldestPolicy.html

You will get
0 submitted
1 submitted
2 submitted
3 submitted
4 submitted
5 submitted
6 submitted
7 submitted
8 submitted
9 submitted
9 finished

public static void main(String[] args) throws SecurityException,
        NoSuchMethodException {

    final Method interruptWorkers = ThreadPoolExecutor.class
            .getDeclaredMethod("interruptWorkers");
    interruptWorkers.setAccessible(true);

    ExecutorService executor = new ThreadPoolExecutor(1, 1, 0L,
            TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),
            new RejectedExecutionHandler() {

                @Override
                public void rejectedExecution(Runnable r,
                        ThreadPoolExecutor executor) {
                    if (!executor.isShutdown()) {
                        try {

                            interruptWorkers.invoke(executor);
                            executor.execute(r);

                        } catch (IllegalArgumentException e) {
                            e.printStackTrace();
                        } catch (IllegalAccessException e) {
                            e.printStackTrace();
                        } catch (InvocationTargetException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });


    for(int i =0 ;i<10;i++)
        executor.submit(newTask(i));
}

private static Runnable newTask(final int id) {
    return new Runnable() {
        {

            System.out.println(id + " submitted");
        }

        @Override
        public void run() {
            try {

                Thread.sleep(5000l);
                System.out.println(id + " finished");
            } catch (InterruptedException e) {
            }

        }

    };
}
于 2012-10-19T05:29:04.970 に答える