1

個別に実行される を作成zmq_forwarder.pyし、アプリからのメッセージを sockJS 接続に渡します。現在、フラスコ アプリが zmq 経由で sockJS からメッセージを受信する方法に取り組んでいます。私の内容を貼り付けていzmq_forwarder.pyます。ZMQ は初めてで、実行するたびに 100% の CPU 負荷を使用する理由がわかりません。

import zmq

# Prepare our context and sockets
context = zmq.Context()

receiver_from_server = context.socket(zmq.PULL)
receiver_from_server.bind("tcp://*:5561")

forwarder_to_server = context.socket(zmq.PUSH)
forwarder_to_server.bind("tcp://*:5562")

receiver_from_websocket = context.socket(zmq.PULL)
receiver_from_websocket.bind("tcp://*:5563")

forwarder_to_websocket = context.socket(zmq.PUSH)
forwarder_to_websocket.bind("tcp://*:5564")

# Process messages from both sockets
# We prioritize traffic from the server
while True:

    # forward messages from the server
    while True:
        try:
            message = receiver_from_server.recv(zmq.DONTWAIT)
        except zmq.Again:
            break

        print "Received from server: ", message
        forwarder_to_websocket.send_string(message)

    # forward messages from the websocket
    while True:
        try:
            message = receiver_from_websocket.recv(zmq.DONTWAIT)
        except zmq.Again:
            break

        print "Received from websocket: ", message
        forwarder_to_server.send_string(message)

ご覧のとおり、4 つのソケットをセットアップしました。アプリはポート 5561 に接続してデータを zmq にプッシュし、ポート 5562 に接続して zmq から受信します (ただし、zmq によって送信されたメッセージをリッスンするように実際に設定する方法をまだ考えています)。一方、sockjs はポート 5564 で zmq からデータを受信し、ポート 5563 でデータを送信します。

zmq.DONTWAITメッセージの受信を非同期でノンブロッキングにすることを読んだので、追加しました。

CPU をオーバーロードしないようにコードを改善する方法はありますか? 目標は、zmq を使用してフラスコ アプリと websocket の間でメッセージをやり取りできるようにすることです。

4

1 に答える 1

5

ブロック ( ) なしで、タイトなループで 2 つのレシーバーソケットをポーリングしているzmq.DONTWAITため、必然的に CPU が最大になります。

ZMQ では、単一のスレッドで複数のソケットをポーリングするためのサポートがいくつかあることに注意してください。この回答を参照してください。poller.poll(millis)大量の着信メッセージがある場合にのみコードが大量の CPU を使用し、それ以外の場合はアイドル状態になるように、タイムアウトを調整できると思います。

もう 1 つのオプションは、コールバックを使用して、ZMQ イベント ループを使用して着信メッセージに非同期的に応答することです。このトピックに関するPyZMQ のドキュメントを参照してください。次の「エコー」の例が適用されています。

# set up the socket, and a stream wrapped around the socket
s = ctx.socket(zmq.REP)
s.bind('tcp://localhost:12345')
stream = ZMQStream(s)

# Define a callback to handle incoming messages
def echo(msg):
    # in this case, just echo the message back again
    stream.send_multipart(msg)

# register the callback
stream.on_recv(echo)

# start the ioloop to start waiting for messages
ioloop.IOLoop.instance().start()
于 2014-02-21T14:37:05.313 に答える