14

Redis と Tornado を非同期で使用する方法を見つけようとしています。tornado-redisを見つけましたが、コードに a を追加するだけでは不十分yieldです。

次のコードがあります。

import redis
import tornado.web

class WaiterHandler(tornado.web.RequestHandler):

    @tornado.web.asynchronous
    def get(self):
        client = redis.StrictRedis(port=6279)
        pubsub = client.pubsub()
        pubsub.subscribe('test_channel')

        for item in pubsub.listen():
            if item['type'] == 'message':
                print item['channel']
                print item['data']

        self.write(item['data'])
        self.finish()


class GetHandler(tornado.web.RequestHandler):

    def get(self):
        self.write("Hello world")


application = tornado.web.Application([
    (r"/", GetHandler),
    (r"/wait", WaiterHandler),
])

if __name__ == '__main__':
    application.listen(8888)
    print 'running'
    tornado.ioloop.IOLoop.instance().start()

/で保留中のリクエストがある間に、URLにアクセスして「Hello World」を取得する必要があります/wait。どうすればいいですか?

4

4 に答える 4

7

IO ループをブロックするため、メインの Tornado スレッドで Redis pub/sub を使用しないでください。メイン スレッドで Web クライアントからのロング ポーリングを処理できますが、Redis をリッスンする別のスレッドを作成する必要があります。メッセージを受信したときに、ioloop.add_callback()and/or aを使用してメイン スレッドと通信できます。threading.Queue

于 2013-03-01T14:55:23.183 に答える
5

Tornado IOLoop 互換の redis クライアントを使用する必要があります。

toredisbrukvaなど、利用できるものはほとんどありません。

toredis の pubsub の例を次に示します: https://github.com/mrjoes/toredis/blob/master/tests/test_handler.py

于 2013-03-01T15:58:00.240 に答える
4

Python >= 3.3 の場合、aioredisを使用することをお勧めします。以下のコードはテストしませんでしたが、次のようになるはずです。

import redis
import tornado.web
from tornado.web import RequestHandler

import aioredis
import asyncio
from aioredis.pubsub import Receiver


class WaiterHandler(tornado.web.RequestHandler):

    @tornado.web.asynchronous
    def get(self):
        client = await aioredis.create_redis((host, 6279), encoding="utf-8", loop=IOLoop.instance().asyncio_loop)

        ch = redis.channels['test_channel']
        result = None
        while await ch.wait_message():
            item = await ch.get()
            if item['type'] == 'message':
                print item['channel']
                print item['data']
                result = item['data']

        self.write(result)
        self.finish()


class GetHandler(tornado.web.RequestHandler):

    def get(self):
        self.write("Hello world")


application = tornado.web.Application([
    (r"/", GetHandler),
    (r"/wait", WaiterHandler),
])

if __name__ == '__main__':
    print 'running'
    tornado.ioloop.IOLoop.configure('tornado.platform.asyncio.AsyncIOLoop')
    server = tornado.httpserver.HTTPServer(application)
    server.bind(8888)
    # zero means creating as many processes as there are cores.
    server.start(0)
    tornado.ioloop.IOLoop.instance().start()
于 2017-02-23T14:41:30.943 に答える
1

では、get リクエストを使ってそれを行う方法の例を次に示します。

2 つの主要コンポーネントを追加しました。

1 つ目は、新しいメッセージをローカル リスト オブジェクトに追加する単純なスレッド化された pubsub リスナーです。また、リスト アクセサーをクラスに追加したので、通常のリストから読み取るかのように、リスナー スレッドから読み取ることができます。あなたWebRequestに関する限り、ローカル リスト オブジェクトからデータを読み取っているだけです。これはすぐに戻り、現在のリクエストの完了や将来のリクエストの受け入れと処理をブロックしません。

class OpenChannel(threading.Thread):
    def __init__(self, channel, host = None, port = None):
        threading.Thread.__init__(self)
        self.lock = threading.Lock()
        self.redis = redis.StrictRedis(host = host or 'localhost', port = port or 6379)
        self.pubsub = self.redis.pubsub()
        self.pubsub.subscribe(channel)

        self.output = []

    # lets implement basic getter methods on self.output, so you can access it like a regular list
    def __getitem__(self, item):
        with self.lock:
            return self.output[item]

    def __getslice__(self, start, stop = None, step = None):
        with self.lock:
            return self.output[start:stop:step]

    def __str__(self):
        with self.lock:
            return self.output.__str__()

    # thread loop
    def run(self):
        for message in self.pubsub.listen():
            with self.lock:
                self.output.append(message['data'])

    def stop(self):
        self._Thread__stop()

2 つ目は ApplicationMixin クラスです。これは、機能と属性を追加するために Web リクエスト クラスに継承させるセカンダリ オブジェクトです。この場合、要求されたチャネルのチャネル リスナーが既に存在するかどうかを確認し、見つからない場合は作成し、リスナー ハンドルを WebRequest に返します。

# add a method to the application that will return existing channels
# or create non-existing ones and then return them
class ApplicationMixin(object):
    def GetChannel(self, channel, host = None, port = None):
        if channel not in self.application.channels:
            self.application.channels[channel] = OpenChannel(channel, host, port)
            self.application.channels[channel].start()
        return self.application.channels[channel]

WebRequest クラスは、リスナーを静的リストであるかのように扱うようになりました (self.write文字列を指定する必要があることに注意してください) 。

class ReadChannel(tornado.web.RequestHandler, ApplicationMixin):
    @tornado.web.asynchronous
    def get(self, channel):
        # get the channel
        channel = self.GetChannel(channel)
        # write out its entire contents as a list
        self.write('{}'.format(channel[:]))
        self.finish() # not necessary?

最後に、アプリケーションを作成した後、空の辞書を属性として追加しました

# add a dictionary containing channels to your application
application.channels = {}

アプリケーションを終了すると、実行中のスレッドのクリーンアップと同様に

# clean up the subscribed channels
for channel in application.channels:
    application.channels[channel].stop()
    application.channels[channel].join()

完全なコード:

import threading
import redis
import tornado.web



class OpenChannel(threading.Thread):
    def __init__(self, channel, host = None, port = None):
        threading.Thread.__init__(self)
        self.lock = threading.Lock()
        self.redis = redis.StrictRedis(host = host or 'localhost', port = port or 6379)
        self.pubsub = self.redis.pubsub()
        self.pubsub.subscribe(channel)

        self.output = []

    # lets implement basic getter methods on self.output, so you can access it like a regular list
    def __getitem__(self, item):
        with self.lock:
            return self.output[item]

    def __getslice__(self, start, stop = None, step = None):
        with self.lock:
            return self.output[start:stop:step]

    def __str__(self):
        with self.lock:
            return self.output.__str__()

    # thread loop
    def run(self):
        for message in self.pubsub.listen():
            with self.lock:
                self.output.append(message['data'])

    def stop(self):
        self._Thread__stop()


# add a method to the application that will return existing channels
# or create non-existing ones and then return them
class ApplicationMixin(object):
    def GetChannel(self, channel, host = None, port = None):
        if channel not in self.application.channels:
            self.application.channels[channel] = OpenChannel(channel, host, port)
            self.application.channels[channel].start()
        return self.application.channels[channel]

class ReadChannel(tornado.web.RequestHandler, ApplicationMixin):
    @tornado.web.asynchronous
    def get(self, channel):
        # get the channel
        channel = self.GetChannel(channel)
        # write out its entire contents as a list
        self.write('{}'.format(channel[:]))
        self.finish() # not necessary?


class GetHandler(tornado.web.RequestHandler):

    def get(self):
        self.write("Hello world")


application = tornado.web.Application([
    (r"/", GetHandler),
    (r"/channel/(?P<channel>\S+)", ReadChannel),
])


# add a dictionary containing channels to your application
application.channels = {}


if __name__ == '__main__':
    application.listen(8888)
    print 'running'
    try:
        tornado.ioloop.IOLoop.instance().start()
    except KeyboardInterrupt:
        pass

    # clean up the subscribed channels
    for channel in application.channels:
        application.channels[channel].stop()
        application.channels[channel].join()
于 2013-03-01T19:02:21.417 に答える