4

すべての websocket 接続 (トルネード オープン コールバック内) が既存のデバイスzmq.SUBへのソケットを作成するアプリケーションがあります。zmq.FORWARDERzmq からコールバックとしてデータを受信し、それを websocket 接続を介してフロントエンド クライアントに中継することが考えられます。

https://gist.github.com/abhinavsingh/6378134

ws.py

import zmq
from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream
ioloop.install()

from tornado.websocket import WebSocketHandler
from tornado.web import Application
from tornado.ioloop import IOLoop
ioloop = IOLoop.instance()

class ZMQPubSub(object):

    def __init__(self, callback):
        self.callback = callback

    def connect(self):
        self.context = zmq.Context()
        self.socket = self.context.socket(zmq.SUB)
        self.socket.connect('tcp://127.0.0.1:5560')
        self.stream = ZMQStream(self.socket)
        self.stream.on_recv(self.callback)

    def subscribe(self, channel_id):
        self.socket.setsockopt(zmq.SUBSCRIBE, channel_id)

class MyWebSocket(WebSocketHandler):

    def open(self):
        self.pubsub = ZMQPubSub(self.on_data)
        self.pubsub.connect()
        self.pubsub.subscribe("session_id")
        print 'ws opened'

    def on_message(self, message):
        print message

    def on_close(self):
        print 'ws closed'

    def on_data(self, data):
        print data

def main():
    application = Application([(r'/channel', MyWebSocket)])
    application.listen(10001)
    print 'starting ws on port 10001'
    ioloop.start()

if __name__ == '__main__':
    main()

フォワーダー.py

import zmq

def main():
    try:
        context = zmq.Context(1)

        frontend = context.socket(zmq.SUB)
        frontend.bind('tcp://*:5559')
        frontend.setsockopt(zmq.SUBSCRIBE, '')

        backend = context.socket(zmq.PUB)
        backend.bind('tcp://*:5560')

        print 'starting zmq forwarder'
        zmq.device(zmq.FORWARDER, frontend, backend)
    except KeyboardInterrupt:
        pass
    except Exception as e:
        logger.exception(e)
    finally:
        frontend.close()
        backend.close()
        context.term()

if __name__ == '__main__':
    main()

パブリッシュ.py

import zmq

if __name__ == '__main__':
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.connect('tcp://127.0.0.1:5559')
    socket.send('session_id helloworld')
    print 'sent data for channel session_id'

しかし、私のZMQPubSubクラスはまったくデータを受け取っていないようです。

さらに実験を行った結果、内にコールバックioloop.IOLoop.instance().start()を登録してから呼び出す必要があることに気付きました。しかし、それは実行をブロックするだけです。on_recvZMQPubSub

main.ioloopインスタンスをコンストラクターに渡そうとしZMQStreamましたが、どちらも役に立ちません。

内のフローをブロックせずZMQStreamに既存のインスタンスにバインドできる方法はありますか?main.ioloopMyWebSocket.open

4

2 に答える 2

1

ZeroMq サブスクライバーは、受信したいメッセージをサブスクライブする必要があります。あなたのコードではそれがわかりません。Pythonのやり方はこれだと思います:

self.socket.setsockopt(zmq.SUBSCRIBE, "")
于 2013-08-28T13:31:32.697 に答える