1

例外をスローするスレッドの場合、例外をスローしなかったすべてのスレッドが終了するまでどのように待つことができますか(したがって、すべてが停止するまでユーザーは再度起動しません)?

私はGParsをいくつかの異なる方法で使用しているので、それぞれに戦略が必要です(並列コレクション、非同期クロージャ、およびフォーク/結合)。例外は埋もれておらず、promiseやgetChildrenResultsなどを介して適切に処理されるため、問題にはなりません(Vaclav Pechの回答に感謝します)。まだ実行中の何かが完了するか、そうでなければ停止されるまで、メインスレッドが待機することを確認する必要があります。

たとえば、並列コレクションを使用する場合、一部のスレッドは実行を継続しますが、例外後に起動しないスレッドもあります。ですから、何人が待っているのか、あるいはそれらを手に入れるのかを知るのは簡単ではありません。

私の推測では、スレッドプール(この場合はGParsPool)を操作する方法があるかもしれません。助言がありますか?

ありがとう!

4

2 に答える 2

3

私は問題の解決策があると信じています。徹底的なテストの後にアプリケーションに実装しましたが、機能します。

withPoolクロージャは、作成されたプール(jsr166y.ForkJoinPool)を最初の引数として渡します。それを取得して変数(currentPool)に保存し、後でメインスレッドで使用できるようにすることができます。

    GParsPool.withPool { pool ->
        currentPool = pool

例外がスローされ、処理のためにメインスレッドに戻ると、次のように、すべてが終了するまで待機させることができます。

    } catch (Exception exc) {
        if (currentPool) {
            while (!currentPool.isQuiescent()) {
                Thread.sleep(100)
                println 'waiting for threads to finish'
            }
        }

        println 'all done'
    }

isQuiescent()は、これ以上実行する必要のない作業がないことを確認するための安全な方法のようです。

テスト中に、私が当初考えていたように、例外がループの実行を終了させて​​いないように見えることにも注意してください。500のリストがあり、eachParallelを実行した場合、最初の1つにエラーがあったかどうかに関係なく、すべて実行されました。そのため、並列ループの例外ハンドラー内でcurrentPool.shutdownNow()を使用してループを終了する必要がありました。参照:GPars-並列コレクションを早期に終了する適切な方法

これは、実際のソリューションの完全な簡略化された表現です。

void example() {
    jsr166y.ForkJoinPool currentPool

    AtomicInteger threadCounter = new AtomicInteger(0)
    AtomicInteger threadCounterEnd = new AtomicInteger(0)

    AtomicReference<Exception> realException = new AtomicReference<Exception>()

    try {
        GParsPool.withPool { pool ->
            currentPool = pool

            (1..500).eachParallel {
                try {
                    if (threadCounter.incrementAndGet() == 1) {
                        throw new RuntimeException('planet blew up!')
                    }

                    if (realException.get() != null) {
                        // We had an exception already in this eachParallel - quit early
                        return
                    }

                    // Do some long work
                    Integer counter=0
                    (1..1000000).each() {counter++}

                    // Flag if we went all the way through
                    threadCounterEnd.incrementAndGet()
                } catch (Exception exc) {
                    realException.compareAndSet(null, exc)

                    pool.shutdownNow()
                    throw realException
                }
            }
        }
    } catch (Exception exc) {
        // If we used pool.shutdownNow(), we need to look at the real exception.
        // This is needed because pool.shutdownNow() sometimes generates a CancellationException
        // which can cover up the real exception that caused us to do a shutdownNow().
        if (realException.get()) {
            exc = realException.get()
        }

        if (currentPool) {
            while (!currentPool.isQuiescent()) {
                Thread.sleep(100)
                println 'waiting for threads to finish'
            }
        }

        // Do further exception handling here...
        exc.printStackTrace()
    }
}

前の例に戻ると、4コアマシンで初めて例外をスローした場合、約5つのスレッドがキューに入れられていました。shutdownNow()は、約20かそこらのスレッドが通過した後に物事を遮断するので、上部の近くに「早くやめる」チェックを入れると、それらの20かそこらをできるだけ早くやめるのに役立ちました。

私がここで得たすべての助けの見返りに、それが他の誰かを助ける場合に備えて、ここに投稿するだけです。ありがとう!

于 2012-12-11T17:04:07.463 に答える
2

例外をキャッチしてから、期待される結果以外のもの(たとえば、数値を期待している場合は文字列やnullなど)を返す必要があると思います。

@Grab('org.codehaus.gpars:gpars:0.12')
import static groovyx.gpars.GParsPool.*

def results = withPool {
  [1,2,3].collectParallel {
    try {
      if( it % 2 == 0 ) {
        throw new RuntimeException( '2 fails' )
      }
      else {
        Thread.sleep( 2000 )
        it
      }
    }
    catch( e ) { e.class.name }
  }
}
于 2012-12-07T21:46:56.047 に答える