6

Python の multiprocessing モジュールに基づいて、次のことを行う必要があります。

-特定のイベントによって中断される可能性のある実行中のプロセスを作成します。

-このプロセスでは、クライアントからメッセージを受信し、このメッセージをオブジェクト インスタンスのハンドラー メソッドに渡します。

ベースコードは以下です(詳細は一部省略)。問題は、インスタンス メソッド (self.enroll(message)) を呼び出そうとすることですが、期待どおりに効果がありません。理由はわかっています。プロセスは独自のメモリなどを使用します。python のマルチプロセッシング Pool.map()を使用してバインドされたメソッドをピクルする問題を解決するときに <type 'instancemethod'> をピクルできず、Manager、Queue、Pool を使用してさまざまなアプローチを試しました...どれも機能しなかったため、私はすることにしました私の意図を理解できるように、コードをできるだけ「生のまま」にしてください。

class DistManager:
    def __init__(self, name, network_address, password):
        self.name = name
        self.network_address = network_address
        self.password = password
        self.distribution_clients = {}

    def _run_distribution_process(self):
        import select
        while not self.should_stop_distribution_service.is_set():
            (sread, swrite, sexc) = select.select([self.distribution_listener], [], [], 0)
            if (sread):
                connection = self.distribution_listener.accept()
                serialized_message = connection.recv()  # currently only receiving
                connection.close()
                message = pickle.loads(serialized_message)
                self.enroll(message)  # THE PROBLEM IS HERE

    def start_distribution_service(self, distribution_port):
        self.distribution_port = distribution_port
        # patch for making Listener work with select.select during run
        Listener.fileno = lambda self: self._listener._socket.fileno()
        self.distribution_listener = Listener(address=(self.network_address, self.distribution_port),
                                              authkey=self.password)
        self.should_stop_distribution_service = Event()
        self.distribution_process = Process(name='Distribution Runner', target=self._run_distribution_process)
        self.distribution_process.daemon = True
        self.distribution_process.start()

    def stop_distribution_service(self):
        from time import sleep
        self.should_stop_distribution_service.set()
        sleep(1)
        self.distribution_listener.close()
        self.distribution_process.terminate()
        return self.distribution_process.exitcode

    def _enroll_distribution_client(self, identifier, network_address, phone_number):
        self.distribution_clients[identifier] = (network_address, phone_number)

    def enroll(self, message):
        if type(message.content) is tuple:
            self._enroll_distribution_client(message.generator_identifier, message.content[0], message.content[1])
        else:
            raise TypeError("Tuple expected")
        return message.code
4

1 に答える 1

1

これには、ピクリングエラーなしで multiprocessing.pool を引き続き使用できます。

クラスでマルチプロセッシングを行うコードに次の行を追加しても、プールを介してメソッドを渡すことができます。コードはクラスの上にある必要があります

import copy_reg
    import types

    def _reduce_method(meth):
        return (getattr,(meth.__self__,meth.__func__.__name__))
    copy_reg.pickle(types.MethodType,_reduce_method)

メソッドをピクルする方法の詳細については、以下を参照してくださいhttp://docs.python.org/2/library/copy_reg.html

于 2013-11-08T14:40:41.333 に答える