2

タスクのパイプラインがあり (パイプライン内の各タスクには異なる並列処理要件があります)、各タスクは異なる ExecutorService で動作します。タスクはデータのパケットで動作するため、10 個のデータ パケットがある場合、10 個のタスクが に送信されservice1、データ パケットごとに 1 つのタスクが送信されます。にサブミットされたタスクservice1が実際に呼び出されると、新しいタスクをサブミットしてデータパケットをさらに処理することがservice2できservice3ます。

次のコードは正常に動作します。

  • shutdown()service1すべてが送信された後に呼び出されますservice1
  • その後、shutdown() の前に送信されたすべてのタスクが実際に実行を完了するまで、awaitTermination() は返されません。--その後、 onがshutdown()呼び出されますが、送信されたすべてのタスクが完了し、すべてのタスクが送信されているためです。-- などservice2service1service2service1service2shutdown()service2service3

    ExecutorService[] services = {
            service1,
            service2,
            service3};
    
    int count = 0;
    for(ExecutorService service: services)
    {   
        service.shutdown();
        service.awaitTermination(1, TimeUnit.HOURS);
    }
    

ただしservice2、データパケットを小さなパケットに分割して追加のタスクを送信できるケースを追加したservice2ところ、コードが失敗しています。問題は、すべてのタスクが完了するshutdown()と呼び出されることですが、実行中のタスクから追加のタスクを送信する必要があります。service2service1service2service2

私の質問:

  1. 送信されたすべてのタスクの実行が終了した後に再実行しますかshutdown()、それともすぐに戻りますが、既に送信されたタスクの実行を停止しませんか? 更新:以下に回答
  2. 新しい問題を解決するにはどうすればよいですか?
4

5 に答える 5

2

「シャットダウン」は、プールにそれ以上の作業を受け入れないように指示するだけです。それ以上は何もしません。提出された既存の作業はすべて通常どおり実行されます。キューが空になると、プールは実際にすべてのスレッドを破棄して終了します。

ここでの問題は、service2のタスクが追加のタスクをservice2に送信して処理すると言っていることです。実際にシャットダウンをいつ呼び出すべきかを知る方法はないようです。しかし、残念ながら、これらの小さなパケットがさらにサービスに分解されないと仮定すると、別の方法があります。

List<Future<Void>> service2Futures = new ArrayList<Future<Void>>();

service2Futures.add(service2.submit(new Callable<Void>() {
  public Void call() throws Exception {
    // do your work, submit more stuff to service2
    // if you submit Callables, you could use Future.get() to wait on those
    // results.
    return null;
  }
}));

for (Future<Void> future : service2Futures) {
  future.get();
}

service2.shutdown();
...

ここで行われているのは、最上位の送信済みタスクのFutureオブジェクトを格納していることです(RunnableではなくCallableを使用する必要があります)。送信後すぐにプールをシャットダウンする代わりに、Futureオブジェクトを収集するだけです。次に、それらを循環し、それぞれに対してget()を呼び出すことにより、すべての実行が完了するまで待ちます。「get()」メソッドは、そのタスクを実行しているスレッドが完了するまでブロックします。

その時点で、すべてのトップレベルのタスクが完了し、2番目のレベルのタスクが送信されます。これで、シャットダウンを発行できます。これは、第2レベルのタスクがservice2にそれ以上のものを送信しないことを前提としています。

これはすべて言われていますが、Java 7を使用している場合は、代わりにForkJoinPoolとRecursiveTaskを検討することを検討する必要があります。それはおそらくあなたがしていることにとってより理にかなっています。

ForkJoinPool forkJoinPool = new ForkJoinPool();
RecursiveAction action = new RecursiveAction() {
    protected void compute() {
         // break down here and build actions
         RecursiveAction smallerActions[] = ...; 
         invokeAll(smallerActions);
    }
};

Future<Void> future = forkJoinPool.submit(action);
于 2012-06-08T13:40:05.253 に答える
1

ExecutorService#shutdownすでに送信されたタスクが実行していることをすべて終了できるようにします-javadocextract

以前に送信されたタスクが実行される正常なシャットダウンを開始しますが、新しいタスクは受け入れられません。すでにシャットダウンしている場合、呼び出しによる追加の効果はありません。
このメソッドは、以前に送信されたタスクが実行を完了するのを待ちません。これを行うには、awaitTerminationを使用します。

shutdown実際には、への呼び出しはいくつかのことを行うと考えることができます。

  1. もう新しい仕事を受け入れることはExecutorServiceできません
  2. 既存のスレッドは、実行が終了すると終了します

だからあなたの質問に答えるために:

  • service1電話をかける前にすべてのタスクをservice1.shutdown送信した場合(その呼び出しの後に何かを送信すると、とにかく例外が発生します)、問題ありません(つまり、これらのタスクがservice2に何かを送信し、service2がシャットダウンされていない場合、それらは実行されます) 。
  • shutdownはすぐに戻り、すでに送信されたタスクが停止することを保証しません(それらは永久に実行される可能性があります)。
  • あなたが抱えている問題は、おそらく、あるサービスから別のサービスにタスクを送信する方法に関連しており、提供した情報だけで問題を解決するのは難しいようです。

最良の方法は、あなたが見ている行動を再現するSSCCEを質問に含めることです。

于 2012-06-08T12:05:24.853 に答える
1

ExecutorService をシャットダウンする代わりに、タスク自体を追跡する必要があります。未処理の作業を追跡するために使用する「ジョブ状態」オブジェクトを渡すことができます。

public class JobState {
  private int _runningJobs;

  public synchronized void start() {
    ++_runningJobs;
  }

  public synchronized void finish() {
    --_runningJobs;
    if(_runningJobs == 0) { notifyAll(); }
  }

  public synchronized void awaitTermination() {
    while(_runningJobs > 0) { wait() }
  }
}

public class SomeJob implements Runnable {
  private final JobState _jobState;

  public void run() {
    try {
      // ... do work here, possibly submitting new jobs, and pass along _jobState
    } finally {
      _jobState.finish();
    }
  }
}

// utility method to start a new job
public static void submitJob(ExecutorService executor, Runnable runnable, JobState jobState) {
  // call start _before_ submitting
  jobState.start();
  executor.submit(runnable);
}

// start main work
JobState state = new JobState();
Runnable firstJob = new SomeJob(state);
submitJob(executor, firstJob, state);
state.awaitTermination();
于 2012-06-08T12:59:12.660 に答える
0

shutdown を呼び出すと、すべてのタスクが終了するのを待ちません。awaitTermination と同じように実行してください。

ただし、シャットダウンが呼び出されると、新しいタスクがブロックされます。エグゼキュータ サービスはすべての新しいタスクを拒否します。ThreadPoolExecutor の場合、RejectedExecutionHandler で拒否されたタスク ハンドル。カスタム ハンドラを指定すると、シャットダウン後に拒否されたタスクを処理できます。これは回避策の 1 つです。

于 2012-06-08T12:26:26.760 に答える
0

マットの質問はうまくいくように見えますが、新しい問題が発生する可能性があることを懸念しています.

少しぎこちなく見えますが、私のシナリオでは多くのコード変更なしで機能するソリューションを思いつきました

service2 と同じタスクを実行する新しいサービス (service2a) を導入しました。サービス 2 のタスクが小さなデータ パケットを送信する場合、それはサービス 2 ではなくサービス 2a に送信されるため、サービス 2 がシャットダウンする前にすべてのサブパケットがサービス 2a に送信されます。小さいデータ パケットをさらにサブパケットに分割する必要がなく、サブパケットのアイデアは service2(a) にのみ適用され、他のサービスには適用されないため、これは私にとってはうまくいきます。

于 2012-06-08T14:10:13.193 に答える