すべての websocket 接続 (トルネード オープン コールバック内) が既存のデバイスzmq.SUB
へのソケットを作成するアプリケーションがあります。zmq.FORWARDER
zmq からコールバックとしてデータを受信し、それを websocket 接続を介してフロントエンド クライアントに中継することが考えられます。
https://gist.github.com/abhinavsingh/6378134
ws.py
import zmq
from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream
ioloop.install()
from tornado.websocket import WebSocketHandler
from tornado.web import Application
from tornado.ioloop import IOLoop
ioloop = IOLoop.instance()
class ZMQPubSub(object):
def __init__(self, callback):
self.callback = callback
def connect(self):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.SUB)
self.socket.connect('tcp://127.0.0.1:5560')
self.stream = ZMQStream(self.socket)
self.stream.on_recv(self.callback)
def subscribe(self, channel_id):
self.socket.setsockopt(zmq.SUBSCRIBE, channel_id)
class MyWebSocket(WebSocketHandler):
def open(self):
self.pubsub = ZMQPubSub(self.on_data)
self.pubsub.connect()
self.pubsub.subscribe("session_id")
print 'ws opened'
def on_message(self, message):
print message
def on_close(self):
print 'ws closed'
def on_data(self, data):
print data
def main():
application = Application([(r'/channel', MyWebSocket)])
application.listen(10001)
print 'starting ws on port 10001'
ioloop.start()
if __name__ == '__main__':
main()
フォワーダー.py
import zmq
def main():
try:
context = zmq.Context(1)
frontend = context.socket(zmq.SUB)
frontend.bind('tcp://*:5559')
frontend.setsockopt(zmq.SUBSCRIBE, '')
backend = context.socket(zmq.PUB)
backend.bind('tcp://*:5560')
print 'starting zmq forwarder'
zmq.device(zmq.FORWARDER, frontend, backend)
except KeyboardInterrupt:
pass
except Exception as e:
logger.exception(e)
finally:
frontend.close()
backend.close()
context.term()
if __name__ == '__main__':
main()
パブリッシュ.py
import zmq
if __name__ == '__main__':
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.connect('tcp://127.0.0.1:5559')
socket.send('session_id helloworld')
print 'sent data for channel session_id'
しかし、私のZMQPubSub
クラスはまったくデータを受け取っていないようです。
さらに実験を行った結果、内にコールバックioloop.IOLoop.instance().start()
を登録してから呼び出す必要があることに気付きました。しかし、それは実行をブロックするだけです。on_recv
ZMQPubSub
main.ioloop
インスタンスをコンストラクターに渡そうとしZMQStream
ましたが、どちらも役に立ちません。
内のフローをブロックせずZMQStream
に既存のインスタンスにバインドできる方法はありますか?main.ioloop
MyWebSocket.open