2

ForkJoinPoolタスク (RecursiveActionまたは) を送信するために使用しているときに、例外 (キャッチされない) を処理するより良い方法は何RecursiveTaskですか?

ForkJoinPool は、Thread.UncaughtExceptionHandlerWorkerThread が突然終了した場合 (これはとにかく私たちの制御下にはありません)、例外を処理するために を受け入れますが、このハンドラーは がForkJoinTask例外をスローする場合には使用されません。私は実装で標準submit/invokeAll方法を使用しています。

これが私のシナリオです:

サードパーティのシステムからデータを読み取る無限ループでスレッドを実行しています。このスレッドでは、タスクをForkJoinPool

new Thread() {
      public void run() {
         while (true) {
             ForkJoinTask<Void> uselessReturn = 
                   ForkJoinPool.submit(RecursiveActionTask);
         }
      }
 }

私は RecursiveAction を使用しており、いくつかのシナリオでは RecursiveTask を使用しています。これらのタスクはメソッドを使用して FJPool に送信されsubmit()ます。UncaughtExceptionHandlerタスクがチェックされていない/キャッチされていない例外をスローした場合に、例外を処理し、必要に応じてタスクを再送信できるような一般的な例外ハンドラーが必要です。例外を処理することで、タスクの 1 つまたは一部が例外をスローした場合に、キューに入れられたタスクが取り消されないようにすることもできます。

invokeAll()メソッドは一連の ForkJoinTasks を返しますが、これらのタスクは再帰ブロックにあります (各タスクはcompute()メソッドを呼び出し、さらに分割される可能性があります [仮想シナリオ] )

class RecursiveActionTask extends RecursiveAction {

    public void compute() {
       if <task.size() <= ACCEPTABLE_SIZE) {
          processTask() // this might throw an checked/unchecked exception
       } else {
          RecursiveActionTask[] splitTasks = splitTasks(tasks)
          RecursiveActionTasks returnedTasks = invokeAll(splitTasks);
          // the below code never executes as invokeAll submits the tasks to the pool 
          // and the flow never comes to the code below.
          // I am looking for some handling like this
          for (RecusiveActionTask task : returnedTasks) {
             if (task.isDone()) {
                task.getException() // handle this exception
             }
          }
       }
    }

}

3 ~ 4 個のタスクが失敗すると、キュー送信ユニット全体が破棄されることに気付きました。現在、私はtry/catch個人的に好きではないプロセスタスクを回避しています。より一般的なものを探しています。

  1. また、失敗したタスクのすべてのリストを知りたいので、それらを再送信できます
  2. タスクが例外をスローすると、スレッドはプールから追い出されますか?
  3. FutureTask でメソッドを呼び出すget()と、タスクが完了するまで待機するため、フローがシーケンシャルになる可能性が高くなります。
  4. Task が失敗した場合のみ、Task のステータスを知りたい。いつ完了するかは気にしません (明らかに 1 時間後に待ちたくありません)。

上記のシナリオで例外を処理する方法はありますか?

4

2 に答える 2

2

これが Akka での解決方法です。

/**
 * INTERNAL AKKA USAGE ONLY
 */
final class MailboxExecutionTask(mailbox: Mailbox) extends ForkJoinTask[Unit] {
  final override def setRawResult(u: Unit): Unit = ()
  final override def getRawResult(): Unit = ()
  final override def exec(): Boolean = try { mailbox.run; true } catch {
    case anything ⇒
      val t = Thread.currentThread
      t.getUncaughtExceptionHandler match {
        case null ⇒
        case some ⇒ some.uncaughtException(t, anything)
      }
      throw anything
  }
 }
于 2012-04-13T09:12:58.567 に答える