6

プロジェクトの1つでThreadPoolExecutorを使用しているときに、JavaDocからThreadPoolExecutorについて詳しく読み始めました。それで、誰かがこの行が実際に何を意味するのか私に説明できますか?-各パラメータが何を表すかは知っていますが、ここの専門家の何人かからより一般的/素人の方法でそれを理解したいと思いました。

ExecutorService service = new ThreadPoolExecutor(10, 10, 1000L,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10, true), new 
ThreadPoolExecutor.CallerRunsPolicy());

更新:- 問題の説明は次のとおりです:-

各スレッドは1から1000までの一意のIDを使用し、プログラムは60分以上実行する必要があるため、その60分ですべてのIDが終了する可能性があるため、それらのIDを再利用する必要があります。これは、上記のエグゼキュータを使用して作成した以下のプログラムです。

class IdPool {
    private final LinkedList<Integer> availableExistingIds = new LinkedList<Integer>();

    public IdPool() {
        for (int i = 1; i <= 1000; i++) {
            availableExistingIds.add(i);
        }
    }

    public synchronized Integer getExistingId() {
        return availableExistingIds.removeFirst();
    }

    public synchronized void releaseExistingId(Integer id) {
        availableExistingIds.add(id);
    }
}


class ThreadNewTask implements Runnable {
    private IdPool idPool;

    public ThreadNewTask(IdPool idPool) {
        this.idPool = idPool;
    }

    public void run() {
        Integer id = idPool.getExistingId();
        someMethod(id);
        idPool.releaseExistingId(id);
    }

// This method needs to be synchronized or not?
    private synchronized void someMethod(Integer id) {
        System.out.println("Task: " +id);
// and do other calcuations whatever you need to do in your program
    }
}

public class TestingPool {
    public static void main(String[] args) throws InterruptedException {
        int size = 10;
        int durationOfRun = 60;
        IdPool idPool = new IdPool();   
        // create thread pool with given size
        ExecutorService service = new ThreadPoolExecutor(size, size, 500L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(size), new ThreadPoolExecutor.CallerRunsPolicy()); 

        // queue some tasks
        long startTime = System.currentTimeMillis();
        long endTime = startTime + (durationOfRun * 60 * 1000L);

        // Running it for 60 minutes
        while(System.currentTimeMillis() <= endTime) {
            service.submit(new ThreadNewTask(idPool));
        }

        // wait for termination        
        service.shutdown();
        service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 
    }
}

私の質問は次のとおりです。-このコードは、パフォーマンスが考慮されているかどうかに関しては正しいですか?そして、それをより正確にするために、他に何をここで作ることができますか?どんな助けでもありがたいです。

4

4 に答える 4

13

[まず、申し訳ありませんが、これは前の回答に対する回答ですが、フォーマットが必要でした]。

実際の場合を除いて、アイテムがフルキューでThreadPoolExecutorに送信されたときにブロックしないでください。この理由は、ThreadPoolExecutorがBlockingQueue.offer(T item)メソッドを呼び出すためです。これは、定義上、非ブロッキングメソッドです。アイテムを追加してtrueを返すか、追加せず(いっぱいの場合)falseを返します。次に、ThreadPoolExecutorは、登録されたRejectedExecutionHandlerを呼び出して、この状況に対処します。

javadocから:

将来、指定されたタスクを実行します。タスクは、新しいスレッドまたは既存のプールされたスレッドで実行される場合があります。このエグゼキュータがシャットダウンされたか、その容量に達したためにタスクを実行に移すことができない場合、タスクは現在のRejectedExecutionHandlerによって処理されます。

デフォルトでは、ThreadPoolExecutor.AbortPolicy()が使用され、ThreadPoolExecutorの「submit」または「execute」メソッドからRejectedExecutionExceptionがスローされます。

try {
   executorService.execute(new Runnable() { ... });
}
catch (RejectedExecutionException e) {
   // the queue is full, and you're using the AbortPolicy as the 
   // RejectedExecutionHandler
}

ただし、他のハンドラーを使用して、エラーを無視する(DiscardPolicy)、または「execute」または「submit」メソッドを呼び出したスレッドで実行する(CallerRunsPolicy)など、別のことを行うことができます。この例では、キューがいっぱいになったときに、「submit」メソッドまたは「execute」メソッドを呼び出すスレッドで、要求されたタスクを実行できます。(これは、いつでも、プール自体にあるものの上で1つの追加のものを実行できることを意味します):

ExecutorService service = new ThreadPoolExecutor(..., new ThreadPoolExecutor.CallerRunsPolicy());

ブロックして待機する場合は、キューに使用可能なスロットができるまでブロックする独自のRejectedExecutionHandlerを実装できます(これは概算です。これをコンパイルまたは実行していませんが、アイデアを得る必要があります)。

public class BlockUntilAvailableSlot implements RejectedExecutionHandler {
  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
     if (e.isTerminated() || e.isShutdown()) {
        return;
     }

     boolean submitted = false;
     while (! submitted) {
       if (Thread.currentThread().isInterrupted()) {
            // be a good citizen and do something nice if we were interrupted
            // anywhere other than during the sleep method.
       }

       try {
          e.execute(r);
          submitted = true;
       }
       catch (RejectedExceptionException e) {
         try {
           // Sleep for a little bit, and try again.
           Thread.sleep(100L);
         }
         catch (InterruptedException e) {
           ; // do you care if someone called Thread.interrupt?
           // if so, do something nice here, and maybe just silently return.
         }
       }
     }
  }
}
于 2012-05-27T04:16:55.777 に答える
2

ExecutorServiceスレッドプールの実行を処理するを作成しています。この場合、プール内のスレッドの初期数と最大数はどちらも10です。プール内のスレッドが1秒間(1000ms)アイドル状態になると、そのスレッドは強制終了されます(アイドルタイマー)。ただし、スレッドの最大数とコア数が同じであるため、これは発生しません(常に10スレッドを維持し、 10スレッドを超えて実行しないでください)。

を使用しArrayBlockingQueueて10スロットの実行要求を管理するため、キューがいっぱいになると(10スレッドがエンキューされた後)、呼び出し元がブロックされます。

スレッドが拒否された場合(この場合、サービスのシャットダウンが原因で、スレッドがキューに入れられるか、キューがいっぱいの場合にスレッドをキューに入れるときにブロックされるため)、提供Runnableされたものは呼び出し元のスレッドで実行されます。

于 2012-05-26T23:15:07.850 に答える
0

セマフォを検討してください。これらは同じ目的のためのものです。セマフォを使用したコードについては、以下を確認してください。これがあなたが望むものであるかどうかわからない。ただし、取得する許可がなくなると、これはブロックされます。IDはあなたにとっても重要ですか?

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

class ThreadNewTask implements Runnable {
    private Semaphore idPool;

    public ThreadNewTask(Semaphore idPool) {
        this.idPool = idPool;
    }

    public void run() {
//      Integer id = idPool.getExistingId();
        try {
            idPool.acquire();
            someMethod(0);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            idPool.release();
        }
//      idPool.releaseExistingId(id);
    }

    // This method needs to be synchronized or not?
    private void someMethod(Integer id) {
        System.out.println("Task: " + id);
        // and do other calcuations whatever you need to do in your program
    }
}

public class TestingPool {
    public static void main(String[] args) throws InterruptedException {
        int size = 10;
        int durationOfRun = 60;
        Semaphore idPool = new Semaphore(100); 
//      IdPool idPool = new IdPool();
        // create thread pool with given size
        ExecutorService service = new ThreadPoolExecutor(size, size, 500L,
                TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(size),
                new ThreadPoolExecutor.CallerRunsPolicy());

        // queue some tasks
        long startTime = System.currentTimeMillis();
        long endTime = startTime + (durationOfRun * 60 * 1000L);

        // Running it for 60 minutes
        while (System.currentTimeMillis() <= endTime) {
            service.submit(new ThreadNewTask(idPool));
        }

        // wait for termination
        service.shutdown();
        service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
    }
}
于 2012-05-28T11:27:53.367 に答える
0

もう1つの解決策は、基になるキューをハックofferしてoffer、大きなタイムアウト(最大292年、無限と見なすことができます)に置き換えることです。


// helper method
private static boolean interruptibleInfiniteOffer(BlockingQueue<Runnable> q, Runnable r) {
    try {
        return q.offer(r, Long.MAX_VALUE, TimeUnit.NANOSECONDS); // infinite == ~292 years
    } catch (InterruptedException e) {
        return false;
    }
}

// fixed size pool with blocking (instead of rejecting) if bounded queue is full
public static ThreadPoolExecutor getFixedSizePoolWithLimitedWaitingQueue(int nThreads, int maxItemsInTheQueue) {
    BlockingQueue<Runnable> queue = maxItemsInTheQueue == 0
            ? new SynchronousQueue<>() { public boolean offer(Runnable r) { return interruptibleInfiniteOffer(this, r);} }
            : new ArrayBlockingQueue<>(maxItemsInTheQueue) { public boolean offer(Runnable r) { return interruptibleInfiniteOffer(this, r);} };
    return new ThreadPoolExecutor(nThreads, nThreads, 0, TimeUnit.MILLISECONDS, queue);
}
于 2021-03-07T14:37:21.193 に答える