5

管理プロセスでデータキューサーバーを起動しようとしています(後でサービスに変換できるようにするため)。データキューサーバー機能はメインプロセスでは正常に機能しますが、を使用して作成されたプロセスでは機能しません。 multiprocessing.Process。

dataQueueServerおよびdataQueueClientコードは、ここにあるマルチプロセッシングモジュールのドキュメントのコードに基づいています。

単独で実行する場合、dataQueueServerは適切に機能します。ただし、mpquueuemultiprocessing.Processで'sstart()を使用して実行すると、機能しません(クライアントでテストした場合)。両方のケースをテストするために、変更なしでdataQueueClientを使用しています。

どちらの場合もコードは到達するので、サーバーは機能していると思いますが、 mpqueueserve_foreverの場合、サーバーがクライアントに通信するのをブロックしています。

パーツを実行するループをserve_forever()スレッドの下に配置して、停止できるようにしました。

コードは次のとおりです。

mpqueue#これは、子プロセスでサーバーを生成しようとする「マネージャー」プロセスです。

import time
import multiprocessing
import threading
import dataQueueServer

class Printer():
    def __init__(self):
        self.lock = threading.Lock()
    def tsprint(self, text):
        with self.lock:
            print text

class QueueServer(multiprocessing.Process):
    def __init__(self, name = '', printer = None):
        multiprocessing.Process.__init__(self)
        self.name = name
        self.printer = printer
        self.ml = dataQueueServer.MainLoop(name = 'ml', printer = self.printer)

    def run(self):
        self.printer.tsprint(self.ml)
        self.ml.start()

    def stop(self):
        self.ml.stop()

if __name__ == '__main__':
    printer = Printer()
    qs = QueueServer(name = 'QueueServer', printer =  printer)
    printer.tsprint(qs)
    printer.tsprint('starting')
    qs.start()
    printer.tsprint('started.')
    printer.tsprint('Press Ctrl-C to quit')
    try:
        while True:
            time.sleep(60)
    except KeyboardInterrupt:
        printer.tsprint('\nTrying to exit cleanly...')
        qs.stop()

    printer.tsprint('stopped')

dataQueueServer

import time
import threading

from multiprocessing.managers import BaseManager
from multiprocessing import Queue

HOST = ''
PORT = 50010
AUTHKEY = 'authkey'

## Define some helper functions for use by the main process loop
class Printer():
    def __init__(self):
        self.lock = threading.Lock()
    def tsprint(self, text):
        with self.lock:
            print text



class QueueManager(BaseManager): 
    pass


class MainLoop(threading.Thread):
    """A thread based loop manager, allowing termination signals to be sent
    to the thread"""
    def __init__(self, name = '', printer = None):
        threading.Thread.__init__(self)
        self._stopEvent = threading.Event()
        self.daemon = True
        self.name = name

        if printer is None:
            self.printer = Printer()
        else:
            self.printer = printer

        ## create the queue
        self.queue = Queue()
        ## Add a function to the handler to return the queue to clients
        self.QM = QueueManager

        self.QM.register('get_queue', callable=lambda:self.queue)
        self.queue_manager = self.QM(address=(HOST, PORT), authkey=AUTHKEY)
        self.queue_server = self.queue_manager.get_server()

    def __del__(self):
        self.printer.tsprint( 'closing...')


    def run(self):
        self.printer.tsprint( '{}: started serving'.format(self.name))
        self.queue_server.serve_forever()


    def stop(self):
        self.printer.tsprint ('{}: stopping'.format(self.name))
        self._stopEvent.set()

    def stopped(self):
        return self._stopEvent.isSet()

def start():
    printer = Printer() 
    ml = MainLoop(name = 'ml', printer = printer)
    ml.start()
    return ml

def stop(ml):
    ml.stop()

if __name__ == '__main__':
    ml = start()
    raw_input("\nhit return to stop")
    stop(ml)

そしてクライアント:

dataQueueClient

import datetime
from multiprocessing.managers import BaseManager


n = 0
N = 10**n

HOST = ''
PORT = 50010
AUTHKEY = 'authkey'


def now():
    return datetime.datetime.now()

def gen(n, func, *args, **kwargs):
    k = 0
    while k < n:
        yield func(*args, **kwargs)
        k += 1

class QueueManager(BaseManager): 
    pass
QueueManager.register('get_queue')
m = QueueManager(address=(HOST, PORT), authkey=AUTHKEY)
m.connect()
queue = m.get_queue()

def load(msg, q):
    return q.put(msg)

def get(q):
    return q.get()

lgen = gen(N, load, msg = 'hello', q = queue)
t0 = now()
while True:
    try:
        lgen.next()
    except StopIteration:
        break
t1 = now()
print 'loaded %d items in ' % N, t1-t0

t0 = now()
while queue.qsize() > 0:
    queue.get()
t1 = now()
print 'got %d items in ' % N, t1-t0
4

1 に答える 1

9

したがって、解決策は十分に単純なようです。を使用せずserve_forever()manager.start()代わりに使用してください。

Eli Benderskyによると、BaseManager(およびその拡張バージョンSyncManager)はすでに新しいプロセスでサーバーを生成しています(そしてmultiprocessing.managersコードを見るとこれが確認されます)。私が経験している問題は、サーバーがメインプロセスで起動される例で使用されているフォームに起因します。

子プロセスで実行したときに現在の例が機能しない理由はまだわかりませんが、それはもはや問題ではありません。

複数のキューサーバーを管理するための動作する(そしてOPから大幅に簡略化された)コードは次のとおりです。

サーバー

from multiprocessing import Queue
from multiprocessing.managers import SyncManager

HOST = ''
PORT0 = 5011
PORT1 = 5012
PORT2 = 5013
AUTHKEY = 'authkey'

name0 = 'qm0'
name1 = 'qm1'
name2 = 'qm2'

description = 'Queue Server'

def CreateQueueServer(HOST, PORT, AUTHKEY, name = None, description = None):
    name = name
    description = description
    q = Queue()

    class QueueManager(SyncManager):
        pass


    QueueManager.register('get_queue', callable = lambda: q)
    QueueManager.register('get_name', callable = name)
    QueueManager.register('get_description', callable = description)
    manager = QueueManager(address = (HOST, PORT), authkey = AUTHKEY)
    manager.start() # This actually starts the server

    return manager

# Start three queue servers
qm0 = CreateQueueServer(HOST, PORT0, AUTHKEY, name0, description)
qm1 = CreateQueueServer(HOST, PORT1, AUTHKEY, name1, description)
qm2 = CreateQueueServer(HOST, PORT2, AUTHKEY, name2, description)

raw_input("return to end")

クライアント

from multiprocessing.managers import SyncManager

HOST = ''
PORT0 = 5011
PORT1 = 5012
PORT2 = 5013
AUTHKEY = 'authkey'

def QueueServerClient(HOST, PORT, AUTHKEY):
    class QueueManager(SyncManager):
        pass
    QueueManager.register('get_queue')
    QueueManager.register('get_name')
    QueueManager.register('get_description')
    manager = QueueManager(address = (HOST, PORT), authkey = AUTHKEY)
    manager.connect() # This starts the connected client
    return manager

# create three connected managers
qc0 = QueueServerClient(HOST, PORT0, AUTHKEY)
qc1 = QueueServerClient(HOST, PORT1, AUTHKEY)
qc2 = QueueServerClient(HOST, PORT2, AUTHKEY)
# Get the queue objects from the clients
q0 = qc0.get_queue()
q1 = qc1.get_queue()
q2 = qc2.get_queue()
# put stuff in the queues
q0.put('some stuff')
q1.put('other stuff')
q2.put({1:123, 2:'abc'})
# check their sizes
print 'q0 size', q0.qsize()
print 'q1 size', q1.qsize()
print 'q2 size', q2.qsize()

# pull some stuff and print it
print q0.get()
print q1.get()
print q2.get()

実行中のキューサーバーの情報と辞書を共有するサーバーを追加して、消費者がそのモデルを使用してどこで利用できるかを簡単に判断できるようにします。ただし、共有辞書には通常の辞書とは少し異なる構文が必要であることに注意してください。機能しdictionary[0] = somethingません。このディクショナリに接続されている他のすべてのクライアントに伝播する構文を使用する必要がありdictionary.update([(key, value), (otherkey, othervalue)])ます。dictionary.get(key)

于 2012-07-18T21:18:24.673 に答える