19

私はPythonでかなり大規模なプロジェクトに取り組んでおり、メインサービスの速度が低下しないように、計算集約型のバックグラウンドタスクの1つを別のコアにオフロードする必要があります。multiprocessing.Queueワーカープロセスの結果を伝達するために使用するときに、明らかに奇妙な動作に遭遇しました。比較の目的でathreading.Threadとaの両方に同じキューを使用すると、スレッドは正常に機能しますが、大きなアイテムをキューに入れた後、プロセスは参加できません。multiprocessing.Process観察:

import threading
import multiprocessing

class WorkerThread(threading.Thread):
    def __init__(self, queue, size):
        threading.Thread.__init__(self)
        self.queue = queue
        self.size = size

    def run(self):
        self.queue.put(range(size))


class WorkerProcess(multiprocessing.Process):
    def __init__(self, queue, size):
        multiprocessing.Process.__init__(self)
        self.queue = queue
        self.size = size

    def run(self):
        self.queue.put(range(size))


if __name__ == "__main__":
    size = 100000
    queue = multiprocessing.Queue()

    worker_t = WorkerThread(queue, size)
    worker_p = WorkerProcess(queue, size)

    worker_t.start()
    worker_t.join()
    print 'thread results length:', len(queue.get())

    worker_p.start()
    worker_p.join()
    print 'process results length:', len(queue.get())

これはで正常に機能することを確認しましsize = 10000たが、でハングしworker_p.join()ますsize = 100000multiprocessing.Processインスタンスが入れることができるものに固有のサイズ制限はありmultiprocessing.Queueますか?それとも、ここで明らかな根本的な間違いを犯していますか?

参考までに、Ubuntu10.04でPython2.6.5を使用しています。

4

3 に答える 3

20

基になるパイプがいっぱいであるように見えるため、フィーダースレッドはパイプへの書き込みをブロックします(実際には、パイプを同時アクセスから保護するロックを取得しようとしたとき)。

この問題を確認してくださいhttp://bugs.python.org/issue8237

于 2012-04-05T12:58:34.917 に答える
2

Pythonマルチプロセッシングへの答え:一部の関数は、完了すると返されない(キューの素材が大きすぎる)ため、戻り値がキューに入れられる任意の関数セットの並列実行で「参加する前にデキューする」という意味が実装されます。

したがって、これにより、任意のサイズのものをキューに入れることができるため、見つけた制限が邪魔になりません。

于 2012-08-08T18:24:38.943 に答える
2

デフォルトでは、キューの最大サイズは無限ですが、それをオーバーライドしました。あなたの場合、worker_pはアイテムをキューに入れているので、joinを呼び出す前にキューを解放する必要があります。詳細については、以下のリンクを参照してください。 https://docs.python.org/2/library/multiprocessing.html#programming-guidelines

于 2015-06-27T18:47:59.150 に答える