0

固定のスレッドプール サイズで singleton-ExecutorService を作成したいと考えています。別のスレッドがその ExecutorService に Callables をフィードし、実行が完了した直後に Callables の結果を (最適に) 解析したいと考えています。

これを適切に実装する方法が本当にわかりません。私の最初の考えは、「submit(callable)」を介して Callable を ExecutorService に追加し、結果の Future をシングルトン内の HashMap または ArrayList 内に格納する、シングルトン ES のメソッドでした。別のスレッドは、指定された間隔内の結果について Futures をチェックします。

しかし、どういうわけか、この解決策は「適切に感じられない」ものであり、このユースケースの解決策を他の場所で見つけられなかったので、後で後悔することをコーディングする前に皆さんに尋ねています。この問題にどのように取り組みますか?

私はあなたの応答を楽しみにしています!

4

4 に答える 4

1

使用MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(THREAD_NUMBER));してサービスを作成し、グアバListenableFutureを使用して結果をすぐに解析することも、将来の結果をリッスンするためにバイクを追加することもできます。

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
  public Explosion call() {
    return pushBigRedButton();
  }
});
Futures.addCallback(explosion, new FutureCallback<Explosion>() {
  // we want this handler to run immediately after we push the big red button!
  public void onSuccess(Explosion explosion) {
    walkAwayFrom(explosion);
  }
  public void onFailure(Throwable thrown) {
    battleArchNemesis(); // escaped the explosion!
  }
});
于 2013-10-23T12:05:33.823 に答える
1

ExecutorCompletionServiceを使用して実装できます。

次の手順は、いくつかの助けになります。

  1. Runtime.getRuntime().availableProcessors()を使用して、使用可能なプロセッサの数を設定します。値を変数 availableProcessors に保持しましょう。

  2. service = Executors.newFixedThreadPool(availableProcessors)のように、ExecutorService を初期化します。

  3. ExecutorCompletionService を初期化し、Callable からの結果が Integer Array Integer[] であると仮定します。ExecutorCompletionService completionService = new ExecutorCompletionService(service)

  4. タスクを送信するには、completionService.submitを使用します。

  5. タスク (呼び出し可能) の各結果を取得するには、completedService.take ().get()を使用します。

上記の手順に基づいて、すべての callable の結果を取得し、必要なビジネスを行うことができます。

于 2013-10-23T12:25:17.937 に答える
1
import java.util.concurrent.*;

public class PostProcExecutor extends ThreadPoolExecutor {

  // adjust the constructor to your desired threading policy
  public PostProcExecutor(int corePoolSize, int maximumPoolSize,
      long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
  }

  @Override
  protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable) {
      @Override
      protected void done()
      {
        if(!isCancelled()) try {
          processResult(get());
        } catch(InterruptedException ex) {
          throw new AssertionError("on complete task", ex);
        } catch(ExecutionException ex) {
          // no result available
        }
      }
    };
  }

  protected void processResult(Object o)
  {
    System.out.println("Result "+o);// do your post-processing here
  }
}
于 2013-10-23T12:07:19.173 に答える