1

モジュールのリモートマネージャ機能を使って、multiprocessing多くのマシンに作業を分散したい。サードパーティのモジュールがあることは知っていますが、できるだけコアにこだわりたいと思っています。デスクトップ (単一マシン) の場合、multiprocessing.Poolクラスを使用して CPU の数を制限できることはわかっていますが、リモート マネージャーについていくつか質問があります。

リモートマネージャー用に次のコードがあります。

   from multiprocessing.managers import BaseManager
   import Queue
   queue = Queue.Queue()
   class QueueManager(BaseManager): pass
   QueueManager.register('get_queue', callable=lambda:queue)
   m = QueueManager(address=('', 50000), authkey='abracadabra')
   s = m.get_server()
   s.serve_forever()

これはうまく機能し、次のコードを使用してキューにジョブを送信することもできます。

QueueManager.register('get_queue')
m = QueueManager(address=('machinename', 50000), authkey='abracadabra')
m.connect()
queue = m.get_queue()
queue.put('hello')

queue.get()キュー内の単一のエントリを取得することもできます。

  1. キュー内のアイテムをどのように取得しますか? キューを繰り返し処理しようとすると、無限ループに入ります。
  2. ワーカーで、各マシンをマシンごとに 1 つのジョブに制限できますか?
  3. このメソッドは、ワーカーがジョブが存在するかどうかを調べる必要があるプル メソッドのようですが、マルチプロセッシング サーバーをトリガーできるプッシュ メソッドはありますか?
4

1 に答える 1

1

キューを反復することは、次のことと同じです。

while True:
    elem = queue.get()  #queue empty -> it blocks!!!

キューを「繰り返し」、実行するジョブがなくなったときにワーカープロセスをブロックするエレガントな方法は、None(または他の何かを)番兵として使用し、次を使用することiter(callable, sentinel)です。

for job in iter(queue.get, None):
    # execute the calculation
    output_queue.put(result)

#shutdown the worker process

これは次と同等です:

while True:
    job = queue.get()
    if job is None:
        break
    #execute the calculation
    output_queue.put(result)
#shutdown the worker process

各ワーカーサブプロセスの番兵をキューに挿入する必要があることに注意してください。そうしないと、それを待っているサブプロセスが存在します。

2番目の質問については、あなたが何を求めているのかわかりません。は、クライアントからの呼び出しを実行する1 つのBaseManagerサーバーを提供するため、明らかに、すべての要求は同じマシンによって満たされます。それとも、各クライアントにリクエストのみを許可するということですか? 「手動で」実装できますが、これにはオプションがありません。

あなたの質問がわかりません。プル方式とは「マルチプロセッシングサーバーをトリガーできるプッシュメソッド」とはどういう意味かについて、もう少し詳しく質問を言い換えることができますか?

于 2013-07-08T17:10:32.610 に答える