7

Python 2.6 の新しい multiprocessing モジュールを試しています。それぞれ独自の multiprocessor.JoinableQueue インスタンスを持ついくつかのプロセスを作成しています。__init__各プロセスは、JoinableQueue インスタンス (各 Thread のメソッドを介して渡される) を共有する 1 つ以上のワーカー スレッド (threading.Thread のサブクラス) を生成します。通常は機能しているように見えますが、時折、予期せずに次のエラーで失敗します。

  File "C:\Documents and Settings\Brian\Desktop\testscript.py", line 49, in run
    self.queue.task_done()
  File "C:\Python26\lib\multiprocessing\queues.py", line 293, in task_done
    raise ValueError('task_done() called too many times')
ValueError: task_done() called too many times

私の Queue get() と task_done() 呼び出しは互いにすぐ後にあるので、それらは等しいはずです。逸話的に、これは get() と task_done() の間で行われる作業が非常に速い場合にのみ発生するようです。小さいtime.sleep(0.01)ものを挿入すると、問題が軽減されるようです。

何が起こっているのですか?従来のキュー (Queue.Queue) の代わりに、マルチプロセッサ キューをスレッドで使用できますか?

ありがとう!

-ブライアン

4

4 に答える 4

4

私はまだ 2.6 でマルチプロセッシングを試していませんでしたが、pyprocessing (2.5 で呼び出されていた) で多くのことをしました。

それぞれが一連のスレッドを生成する多数のプロセスを探していることがわかります。

multiprocessing モジュールを使用しているため、マルチスレッドではなくマルチプロセスを使用することをお勧めします。デッドロックなどの問題が少なくなります。

キュー オブジェクトを作成します。http://pyprocessing.berlios.de/doc/queue-objects.html

マルチ プロセス環境を作成するには、ワーカー プロセスを管理するプールを使用します: http://pyprocessing.berlios.de/doc/pool-objects.html 。次に、ワーカーに非同期/同期を適用し、必要に応じて各ワーカーにコールバックを追加することもできます。ただし、コールバックは一般的なコード ブロックであり、すぐに返される必要があることを忘れないでください (ドキュメントに記載されているとおり)。

追加情報: 必要に応じて、マネージャーhttp://pyprocessing.berlios.de/doc/manager-objects.htmlを作成して、キュー オブジェクトへのアクセスを管理します。これを行うには、キュー オブジェクトを共有する必要があります。ただし、共有して管理すると、プロキシ オブジェクトを作成することにより、ネットワーク全体でこの共有キューにアクセスできるという利点があります。これにより、集中型共有キュー オブジェクトのメソッドを (明らかに) 任意のネットワーク ノードのネイティブ メソッドとして呼び出すことができます。

これはドキュメントのコード例です

1 台のマシンでマネージャー サーバーを実行し、クライアントが他のマシンからそれを使用するようにすることができます (関連するファイアウォールで許可されている場合)。次のコマンドを実行すると、リモート クライアントが使用できる共有キューのサーバーが作成されます。

>>> from processing.managers import BaseManager, CreatorMethod
>>> import Queue
>>> queue = Queue.Queue()
>>> class QueueManager(BaseManager):
...     get_proxy = CreatorMethod(callable=lambda:queue, typeid='get_proxy')
...
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey='none')
>>> m.serve_forever()

1 つのクライアントは、次のようにサーバーにアクセスできます。

>>> from processing.managers import BaseManager, CreatorMethod
>>> class QueueManager(BaseManager):
...     get_proxy = CreatorMethod(typeid='get_proxy')
...
>>> m = QueueManager.from_address(address=('foo.bar.org', 50000), authkey='none')
>>> queue = m.get_proxy()
>>> queue.put('hello')

安全なスレッド化を主張する場合、PEP371 (マルチプロセッシング) はこれを参照します http://code.google.com/p/python-safethread/

于 2008-12-05T02:08:18.530 に答える
2

ターゲットの引数として Queue オブジェクトを渡す必要があります。

multiprocessing のドキュメントの例:

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

 if __name__ == '__main__':
     q = Queue()
     p = Process(target=f, args=(q,))
     p.start()
     print q.get()    # prints "[42, None, 'hello']"
     p.join()

キューはスレッドおよびプロセスセーフです。

于 2008-12-05T00:59:29.853 に答える
1

次のバグが発生している可能性があります。

http://bugs.python.org/issue4660

于 2010-02-07T19:43:37.820 に答える
-1

迅速な返答に感謝致します。あなたが示すように、私は multiprocessing.Queue インスタンスを引数として各プロセスに渡しています。障害はスレッドで発生しているようです。threading.Thread をサブクラス化し、各スレッド インスタンスの「 init 」メソッドにキューを渡すことで、それらを作成しています。これは、キューをスレッド サブクラスに渡す方法として受け入れられているようです。私は、マルチプロセッシング キューはスレッドと互換性がない可能性があると考えていました (ただし、スレッドセーフであると思われます)。

于 2008-12-05T01:12:12.683 に答える