6

を使用していくつかのコードを並列化しようとしましたがconcurrent.futures.ProcessPoolExecutor、 では発生しない奇妙なデッドロックが発生し続けていますThreadPoolExecutor。最小限の例:

from concurrent import futures

def test():
    pass

with futures.ProcessPoolExecutor(4) as executor:
    for i in range(100):
        print('submitting {}'.format(i))
        executor.submit(test)

Python 3.2.2 (64 ビット Ubuntu 上) では、これはすべてのジョブを送信した後に一貫してハングしているように見えます。これは、送信されたジョブの数がワーカーの数よりも多い場合に発生するようです。私が交換ProcessPoolExecutorした場合、ThreadPoolExecutorそれは問題なく動作します。

調査の試みとして、各 Future にi次の値を出力するためのコールバックを指定しました。

from concurrent import futures

def test():
    pass

with futures.ProcessPoolExecutor(4) as executor:
    for i in range(100):
        print('submitting {}'.format(i))
        future = executor.submit(test)

        def callback(f):
            print('callback {}'.format(i))
        future.add_done_callback(callback)

これは私をさらに混乱させました-i出力された bycallbackの値は、定義されたときではなく、呼び出されたときの値です(したがって、表示されませんcallback 0が、多くのcallback 99s が得られます)。繰り返しThreadPoolExecutorますが、期待値を出力します。

これはバグではないかと思い、最近開発された Python のバージョンを試してみました。これで、コードは少なくとも終了したように見えますが、それでも間違った値がi出力されます。

誰でも説明できます:

  • ProcessPoolExecutorPython 3.2 と、このデッドロックを明らかに修正した現在の開発バージョンの間で何が起こったのか

  • の「間違った」値iが表示される理由

編集: jukiewicz が以下で指摘したように、もちろん、印刷iするとコールバックが呼び出されたときに値が印刷されます。何を考えていたのかわかりません...i属性の 1 つとして値を持つ呼び出し可能なオブジェクトを渡す場合、期待どおりに動作します。

EDIT:もう少し情報:すべてのコールバックが実行されるため、プロセスが完了したことを伝えることができないexecutor.shutdown(によって呼び出される)ようです。これは現在の python 3.3 で完全に修正されたようですが、とexecutor.__exit__に多くの変更があったようです。そのため、何が修正されたのかわかりません。私は 3.3 を使用できないので (numpy のリリース バージョンまたは開発バージョンと互換性がないようです)、そのマルチプロセッシング パッケージとコンカレント パッケージを 3.2 インストールに単純にコピーしてみましたが、問題なく動作しているようです。それでも、私が見る限り、最新のリリース バージョンでは完全に壊れているのに、他の誰も影響を受けていないというのは少し奇妙に思えます。multiprocessingconcurrent.futuresProcessPoolExecutor

4

1 に答える 1

3

次のようにコードを変更して、両方の問題を解決しました。callbackfunction はクロージャーとして定義されていたため、i毎回更新された値を使用していました。デッドロックに関しては、すべてのタスクが完了する前に Executor をシャットダウンする原因になる可能性があります。先物が完了するのを待つこともそれを解決します。

from concurrent import futures

def test(i):
    return i

def callback(f):
    print('callback {}'.format(f.result()))


with futures.ProcessPoolExecutor(4) as executor:
    fs = []
    for i in range(100):
        print('submitting {}'.format(i))
        future = executor.submit(test, i)
        future.add_done_callback(callback)
        fs.append(future)

    for _ in futures.as_completed(fs): pass

更新: 申し訳ありませんが、更新を読んでいません。これは既に解決されているようです。

于 2012-03-04T08:34:53.070 に答える