2

こんにちは私はPythonクラスでいくつかのZMQプルクライアントをラップする際にいくつかの問題を抱えています。これらのクラスは、マルチプロセッシングモジュールを介してサブプロセスでインスタンス化および呼び出されます。クライアントが関数の場合はすべて機能しますが、クラスの場合はpoller.poll()がハングします。

以下のコードには両方のバージョンがあります。1つは機能し、もう1つは機能しません。なんで?

import zmq
import time
import sys
import random
from  multiprocessing import Process

def server_push(port="5556"):
    context = zmq.Context()
    socket = context.socket(zmq.PUSH)
    socket.bind("tcp://*:%s" % port)
    print "Running server on port: ", port
    # serves only 5 request and dies
    for reqnum in range(10):
        if reqnum < 6:
            socket.send("Continue")
        else:
            socket.send("Exit")
            break
        time.sleep (1) 

def server_pub(port="5558"):
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://*:%s" % port)
    publisher_id = random.randrange(0,9999)
    print "Running server on port: ", port
    # serves only 5 request and dies
    for reqnum in range(10):
        # Wait for next request from client
        topic = random.randrange(8,10)
        messagedata = "server#%s" % publisher_id
        print "%s %s" % (topic, messagedata)
        socket.send("%d %s" % (topic, messagedata))
        time.sleep(1)    


class Client:
    def __init__(self,port_push, port_sub):
        context = zmq.Context()
        self.socket_pull = context.socket(zmq.PULL)
        self.socket_pull.connect ("tcp://localhost:%s" % port_push)
        print "Connected to server with port %s" % port_push
        self.socket_sub = context.socket(zmq.SUB)
        self.socket_sub.connect ("tcp://localhost:%s" % port_sub)
        self.socket_sub.setsockopt(zmq.SUBSCRIBE, "9")
        print "Connected to publisher with port %s" % port_sub
        # Initialize poll set


    def __call__(self):
        poller = zmq.Poller()
        poller.register(self.socket_pull, zmq.POLLIN)
        poller.register(self.socket_sub, zmq.POLLIN)
        # Work on requests from both server and publisher
        should_continue = True
        print "listening"
        while should_continue:
            print "hello"
            socks = dict(poller.poll())
            print poller
            if self.socket_pull in socks and socks[self.socket_pull] == zmq.POLLIN:
                message = self.socket_pull.recv()
                print "Recieved control command: %s" % message
                if message == "Exit": 
                    print "Recieved exit command, client will stop recieving messages"
                    should_continue = False

                if self.socket_sub in socks and socks[self.socket_sub] == zmq.POLLIN:
                    string = self.socket_sub.recv()
                    topic, messagedata = string.split()
                    print "Processing ... ", topic, messagedata

def client(port_push, port_sub):
    context = zmq.Context()
    socket_pull = context.socket(zmq.PULL)
    socket_pull.connect ("tcp://localhost:%s" % port_push)
    print "Connected to server with port %s" % port_push
    socket_sub = context.socket(zmq.SUB)
    socket_sub.connect ("tcp://localhost:%s" % port_sub)
    socket_sub.setsockopt(zmq.SUBSCRIBE, "9")
    print "Connected to publisher with port %s" % port_sub
    # Initialize poll set
    poller = zmq.Poller()
    poller.register(socket_pull, zmq.POLLIN)
    poller.register(socket_sub, zmq.POLLIN)
    # Work on requests from both server and publisher
    should_continue = True
    while should_continue:
        socks = dict(poller.poll())
        if socket_pull in socks and socks[socket_pull] == zmq.POLLIN:
            message = socket_pull.recv()
            print "Recieved control command: %s" % message
            if message == "Exit": 
                print "Recieved exit command, client will stop recieving messages"
                should_continue = False

        if socket_sub in socks and socks[socket_sub] == zmq.POLLIN:
            string = socket_sub.recv()
            topic, messagedata = string.split()
            print "Processing ... ", topic, messagedata

if __name__ == "__main__":
    # Now we can run a few servers 
    server_push_port = "5556"
    server_pub_port = "5558"
    Process(target=server_push, args=(server_push_port,)).start()
    Process(target=server_pub, args=(server_pub_port,)).start()
    #~ Process(target=client,args=(server_push_port,server_pub_port)).start()
    Process(target=Client(server_push_port,server_pub_port)).start()
4

1 に答える 1

2

編集1:これは完全に正しくありません...それを正しくするために少し時間をください...

Clientクラスを間違った方法で呼び出している可能性があります。私はこれに関する専門家ではありませんが、クライアントをProcessからサブクラス化してから、.start()関数を使用して実行する必要があると思います。したがって、Clientクラスを次のように定義します。

class Client(Process):
    def __init__(self, port_push, port_sub):
        (...) # your class init code here...make sure indentation is correct

次に、サーバーを実行する最後に、Clientクラスのインスタンスを作成し、次のように開始します。

client_class = Client(port_push, port_sub)
client_class.start()

Edit2:これが私のために働くfccoelhoのコードの編集されたバージョンです。

最大の問題は、ZMQの初期化を__call__、ではなくメソッドで行う必要があること__init__です。これは、マルチプロセッシングでメモリがどのように割り当てられているかによるものと思われます。__init__関数は親プロセスで実行されますが、__call__関数は、別のメモリスペースを使用して子プロセスで実行されます。どうやらZMQはこれが好きではありません。また、サーバーの準備が整う前にクライアントがサーバーに接続するのを防ぎ、クライアントがサブスクライブする前にサーバーがメッセージを送信しないようにするために、待機時間を追加しました。また、localhostの代わりに127.0.0.1を使用しています(私のコンピューターは何らかの理由でlocalhostを好みません)。また、クライアントのポーリング呼び出しに関する煩わしい印刷メッセージを削除し、クライアントがpubsubソケットでポーリング結果をチェックするインデントの問題を修正しました。

import zmq
import time
import sys
import random
from  multiprocessing import Process

def server_push(port="5556"):
    context = zmq.Context()
    socket = context.socket(zmq.PUSH)
    socket.bind("tcp://127.0.0.1:%s" % port)
    print "Running server on port: ", port
    time.sleep(1.0)
    # serves only 5 request and dies
    for reqnum in range(10):
        if reqnum < 6:
            socket.send("Continue")
        else:
            socket.send("Exit")
            print 'Push server sent "Exit" signal'
            break
        time.sleep(0.4) 

def server_pub(port="5558"):
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://127.0.0.1:%s" % port)
    socket.setsockopt(zmq.HWM, 1000)
    publisher_id = random.randrange(0,9999)
    print "Running server on port: ", port
    time.sleep(1.0)
    # serves only 5 request and dies
    for reqnum in range(10):
        # Wait for next request from client
        topic = random.randrange(8,10)
        messagedata = "server#%s" % publisher_id
        print "%s %s" % (topic, messagedata)
        socket.send("%d %s" % (topic, messagedata))
        time.sleep(0.4)    


class Client:
    def __init__(self,port_push, port_sub):
        self.port_push = port_push
        self.port_sub = port_sub
        # Initialize poll set

    def __call__(self):
        time.sleep(0.5)
        print 'hello from class client!'
        context = zmq.Context()
        self.socket_pull = context.socket(zmq.PULL)
        self.socket_pull.connect ("tcp://127.0.0.1:%s" % self.port_push)
        print "Connected to server with port %s" % self.port_push
        self.socket_sub = context.socket(zmq.SUB)
        self.socket_sub.connect ("tcp://127.0.0.1:%s" % self.port_sub)
        self.socket_sub.setsockopt(zmq.SUBSCRIBE, "9")
        print "Connected to publisher with port %s" % self.port_sub

        poller = zmq.Poller()
        poller.register(self.socket_pull, zmq.POLLIN)
        poller.register(self.socket_sub, zmq.POLLIN)
        # Work on requests from both server and publisher
        should_continue = True
        print "listening"
        while should_continue:
            # print "hello"
            socks = dict(poller.poll())
            # print poller
            if self.socket_pull in socks and socks[self.socket_pull] == zmq.POLLIN:
                message = self.socket_pull.recv()
                print "Recieved control command: %s" % message
                if message == "Exit": 
                    print "Recieved exit command, client will stop recieving messages"
                    should_continue = False

            if self.socket_sub in socks and socks[self.socket_sub] == zmq.POLLIN:
                string = self.socket_sub.recv()
                topic, messagedata = string.split()
                print "Processing ... ", topic, messagedata

def client(port_push, port_sub):
    print 'hello from function client!'
    context = zmq.Context()
    socket_pull = context.socket(zmq.PULL)
    socket_pull.connect ("tcp://127.0.0.1:%s" % port_push)
    print "Connected to server with port %s" % port_push
    socket_sub = context.socket(zmq.SUB)
    socket_sub.connect ("tcp://127.0.0.1:%s" % port_sub)
    socket_sub.setsockopt(zmq.SUBSCRIBE, "9")
    print "Connected to publisher with port %s" % port_sub
    # Initialize poll set
    poller = zmq.Poller()
    poller.register(socket_pull, zmq.POLLIN)
    poller.register(socket_sub, zmq.POLLIN)
    # Work on requests from both server and publisher
    should_continue = True
    while should_continue:
        socks = dict(poller.poll(1000))
        if socket_pull in socks and socks[socket_pull] == zmq.POLLIN:
            message = socket_pull.recv()
            print "Recieved control command: %s" % message
            if message == "Exit": 
                print "Recieved exit command, client will stop recieving messages"
                should_continue = False

        if socket_sub in socks and socks[socket_sub] == zmq.POLLIN:
            string = socket_sub.recv()
            topic, messagedata = string.split()
            print "Processing ... ", topic, messagedata

if __name__ == "__main__":
    # Now we can run a few servers 
    server_push_port = "5556"
    server_pub_port = "5558"
    Process(target=server_push, args=(server_push_port,)).start()
    Process(target=server_pub, args=(server_pub_port,)).start()
    # Process(target=client,args=(server_push_port,server_pub_port)).start()
    Process(target=Client(server_push_port,server_pub_port)).start()

最後に、これは非常に必要最低限​​のマルチプロセスpubsubのよりクリーンな実装ですが、物事をより明確に示しています。

import zmq
from multiprocessing import Process
import time

class ServerPubSub(Process):
    def __init__(self, port, n):
        Process.__init__(self)
        self.port = port
        self.n = n

    def run(self):
        self.context = zmq.Context()
        self.pub = self.context.socket(zmq.PUB)
        self.pub.bind('tcp://127.0.0.1:%d' % self.port)
        self.pub.setsockopt(zmq.HWM, 1000)

        time.sleep(1)

        end = False
        for i in range(self.n):
            print 'SRV: sending message %d' % i
            self.pub.send('Message %d' % i)
            print 'SRV: message %d sent' % i
            time.sleep(0.2)

        self.pub.close()

class ClientPubSub(Process):
    def __init__(self, port, n):
        Process.__init__(self)
        self.port = port
        self.n = n

    def run(self):
        self.context = zmq.Context()
        self.sub = self.context.socket(zmq.SUB)
        self.sub.connect('tcp://127.0.0.1:%d' % self.port)
        self.sub.setsockopt(zmq.SUBSCRIBE, '')
        self.poller = zmq.Poller()
        self.poller.register(self.sub, zmq.POLLIN)

        end = False
        count = 0
        while count < self.n:
            ready = dict(self.poller.poll(0))
            if self.sub in ready and ready[self.sub] == zmq.POLLIN:
                msg = self.sub.recv()
                print 'CLI: received message "%s"' % msg
                count += 1

        self.sub.close()

if __name__ == "__main__":
    port = 5000
    n = 10
    server = ServerPubSub(port, n)
    client = ClientPubSub(port, n)

    server.start()
    client.start()

    server.join()
    client.join()
于 2012-03-19T13:21:36.183 に答える