1

Python で単純なクライアント サーバー アプリケーションを開発しています。マネージャを使用して共有キューを設定していますが、サーバーからクライアントに任意のオブジェクトを渡す方法がわかりません。manager.register 関数と関係があると思われますが、マルチプロセッシングのドキュメントではあまり説明されていません。そこにある唯一の例は、キューを使用し、他には何も使用していません。

これが私のコードです:

#manager demo.py
from multiprocessing import Process, Queue, managers
from multiprocessing.managers import SyncManager
import time

class MyObject():
    def __init__( self, p, f ):
        self.parameter = p
        self.processor_function = f

class MyServer():
    def __init__(self, server_info, obj):
        print '=== Launching Server ... ====='
        (ip, port, pw) = server_info
        self.object = obj       #Parameters for task processing

        #Define queues
        self._process_queue = Queue()       #Queue of tasks to be processed
        self._results_queue = Queue()       #Queue of processed tasks to be stored

        #Set up IS_Manager class and register server functions
        class IS_Manager(managers.BaseManager): pass
        IS_Manager.register('get_processQ', callable=self.get_process_queue)
        IS_Manager.register('get_resultsQ', callable=self.get_results_queue)
        IS_Manager.register('get_object', callable=self.get_object)

        #Initialize manager and server
        self.manager = IS_Manager(address=(ip, port), authkey=pw)
        self.server = self.manager.get_server()

        self.server_process = Process( target=self.server.serve_forever )
        self.server_process.start()

    def get_process_queue(self): return self._process_queue
    def get_results_queue(self): return self._results_queue
    def get_object(self): return self.object

    def runUntilDone(self, task_list):
        #Fill the initial queue
        for t in task_list:
            self._process_queue.put(t)

        #Main loop
        total_tasks = len(task_list)
        while not self._results_queue.qsize()==total_tasks:
            time.sleep(.5)
            print self._process_queue.qsize(), '\t', self._results_queue.qsize()
            if not self._results_queue.empty():
                print '\t', self._results_queue.get()
            #Do stuff
            pass

class MyClient():
    def __init__(self, server_info):
        (ip, port, pw) = server_info
        print '=== Launching Client ... ====='

        class IS_Manager(managers.BaseManager): pass

        IS_Manager.register('get_processQ')
        IS_Manager.register('get_resultsQ')
        IS_Manager.register('get_object')

        #Set up manager, pool
        print '\tConnecting to server...'
        manager = IS_Manager(address=(ip, port), authkey=pw)
        manager.connect()

        self._process_queue = manager.get_processQ()
        self._results_queue = manager.get_resultsQ()
        self.object = manager.get_object()

        print '\tConnected.'

    def runUntilDone(self):#, parameters):
        print 'Starting client main loop...'

        #Main loop
        while 1:
            if self._process_queue.empty():
                print 'I\'m bored here!'
                time.sleep(.5)
            else:
                task = self._process_queue.get()
                print task, '\t', self.object.processor_function( task, self.object.parameter )

        print 'Client process is quitting.  Bye!'
        self._clients_queue.get()

そしてシンプルなサーバー...

from manager_demo import *

def myProcessor( x, parameter ):
    return x + parameter

if __name__ == '__main__':
    my_object = MyObject( 100, myProcessor )
    my_task_list = range(1,20)
    my_server_info = ('127.0.0.1', 8081, 'my_pw')

    my_crawl_server = MyServer( my_server_info, my_object )
    my_crawl_server.runUntilDone( my_task_list )

そして単純なクライアント...

from manager_demo import *
if __name__ == '__main__':
    my_server_info = ('127.0.0.1', 8081, 'my_pw')
    my_client = MyClient( my_server_info )
    my_client.runUntilDone()

これを実行すると、次の場所でクラッシュします。

erin@Erin:~/Desktop$ python client.py 
=== Launching Client ... =====
    Connecting to server...
    Connected.
Starting client main loop...
2   Traceback (most recent call last):
  File "client.py", line 5, in <module>
    my_client.runUntilDone()
  File "/home/erin/Desktop/manager_demo.py", line 84, in runUntilDone
    print task, '\t', self.object.processor_function( task, self.object.parameter )
AttributeError: 'AutoProxy[get_object]' object has no attribute 'parameter'

Python がキューや processor_function で問題なく、オブジェクト パラメータでチョークするのはなぜですか? ありがとう!

4

1 に答える 1

2

parameterクラスの属性がMyObject()呼び出し可能ではないため、この問題が発生しています。

ドキュメントには、_exposed_このtypeidのプロキシとなるメソッド名のシーケンスを指定するために使用されると記載されています。公開リストが指定されていない場合、共有オブジェクトのすべての「パブリックメソッド」にアクセスできます。(ここで「パブリックメソッド」とは、__呼び出し__()メソッドを持ち、名前が「_」で始まらない属性を意味します。)

したがって、 :を変更することにより、おそらくメソッドとして、でparameter属性を手動で公開する必要があります。MyObjectMyObject()

class MyObject():
    def __init__(self, p, f):
        self._parameter = p
        self.processor_function = f

    def parameter(self):
        return self._parameter

また、タスクを次のように変更する必要があります。

 self.object.processor_function(task, self.object.parameter())

HTH。

于 2011-03-15T02:50:41.913 に答える