4

Django アプリにServer Sent Event APIを実装して、リアルタイムの更新をバックエンドからブラウザーにストリーミングしました。バックエンドは Redis pubsub です。私の Django ビューは次のようになります。

def event_stream(request):
   """
   Stream worker events out to browser.
   """

   listener = events.Listener(
       settings.EVENTS_PUBSUB_URL,
       channels=[settings.EVENTS_PUBSUB_CHANNEL],
       buffer_key=settings.EVENTS_BUFFER_KEY,
       last_event_id=request.META.get('HTTP_LAST_EVENT_ID')
   )

   return http.HttpResponse(listener, mimetype='text/event-stream')

そして、イテレータとして返す events.Listener クラスは次のようになります。

class Listener(object):
    def __init__(self, rcon_or_url, channels, buffer_key=None,
                 last_event_id=None):
        if isinstance(rcon_or_url, redis.StrictRedis):
            self.rcon = rcon_or_url
        elif isinstance(rcon_or_url, basestring):
            self.rcon = redis.StrictRedis(**utils.parse_redis_url(rcon_or_url))
        self.channels = channels
        self.buffer_key = buffer_key
        self.last_event_id = last_event_id
        self.pubsub = self.rcon.pubsub()
        self.pubsub.subscribe(channels)

    def __iter__(self):
        # If we've been initted with a buffer key, then get all the events off
        # that and spew them out before blocking on the pubsub.
        if self.buffer_key:
            buffered_events = self.rcon.lrange(self.buffer_key, 0, -1)

            # check whether msg with last_event_id is still in buffer.  If so,
            # trim buffered_events to have only newer messages.
            if self.last_event_id:
                # Note that we're looping through most recent messages first,
                # here
                counter = 0
                for msg in buffered_events:
                    if (json.loads(msg)['id'] == self.last_event_id):
                        break
                    counter += 1
                buffered_events = buffered_events[:counter]

            for msg in reversed(list(buffered_events)):
                # Stream out oldest messages first
                yield to_sse({'data': msg})
        try:
            for msg in self.pubsub.listen():
                if msg['type'] == 'message':
                    yield to_sse(msg)
        finally:
            logging.info('Closing pubsub')
            self.pubsub.close()
            self.rcon.connection_pool.disconnect()

このセットアップで、イベントをブラウザーに正常にストリーミングできます。ただし、リスナーの「最終的に」の切断呼び出しは、実際には呼び出されないようです。pubsub からメッセージが来るのを待って、彼らはまだ野宿していると思います。クライアントが切断して再接続すると、Redis インスタンスへの接続数が増加し、減少することはありません。約 1000 になると、Redis は異常を起こし始め、利用可能なすべての CPU を消費します。

クライアントがリッスンしなくなったことを検出し、その時点で Redis 接続を閉じられるようにしたいと考えています。

私が試したり考えたりしたこと:

  1. 接続プール。しかし、redis-py README が述べているように、「PubSub または Pipeline オブジェクトをスレッド間で渡すことは安全ではありません。」
  2. 接続、または単に切断を処理するミドルウェア。ミドルウェアの process_response() メソッドがあまりにも早く (http ヘッダーがクライアントに送信される前に) 呼び出されるため、これは機能しません。コンテンツをストリーミングしている最中にクライアントが切断されたときに呼び出されるものが必要です。
  3. request_finishedおよびgot_request_exceptionシグナル。最初のものは、ミドルウェアの process_response() のように、起動が早すぎるようです。2 番目は、クライアントがストリームの途中で切断されたときに呼び出されません。

最終的なしわ: 本番環境では Gevent を使用しているため、一度に多くの接続を開いたままにしておくことができます。ただし、この接続リークの問題は、単純な古い「manage.py runserver」、Gevent のモンキーパッチを適用した runserver、または Gunicorn の gevent ワーカーを使用している場合に発生します。

4

1 に答える 1

1

更新: Django 1.5 の時点で、この質問/回答で行っているように物事を遅延してストリーミングしたい場合は、 StreamingHttpResponse インスタンスを返す必要があります。

以下の元の回答

いろいろなことを調べたり、フレームワークのコードを読んだりした結果、この質問に対する正しい答えだと思うものを見つけました。

  1. WSGI PEPによると、アプリケーションが close() メソッドで反復子を返す場合、応答が終了したら WSGI サーバーによって呼び出される必要があります。Django もこれをサポートしています。これは、私が必要とする Redis 接続のクリーンアップを行う自然な場所です。
  2. Python の wsgiref 実装にバグがあり、Django の「runserver」の拡張により、クライアントがストリームの途中でサーバーから切断された場合に close() がスキップされます。パッチを提出しました。
  3. サーバーが close() を受け入れたとしても、クライアントへの書き込みが実際に失敗するまで呼び出されません。イテレータが pubsub の待機中にブロックされ、何も送信しない場合、close() は呼び出されません。クライアントが接続するたびに、ノーオペレーション メッセージを pubsub に送信することで、この問題を回避しました。そうすれば、ブラウザが通常の再接続を行うと、現在機能していないスレッドは閉じられた接続に書き込みを試み、例外をスローし、サーバーが close() を呼び出すとクリーンアップされます。SSEの仕様では、コロンで始まる行はすべて無視すべきコメントであると規定されているため、古いクライアントをフラッシュするための no-op メッセージとして ":\n" を送信するだけです。

これが新しいコードです。まず Django ビュー:

def event_stream(request):
    """
    Stream worker events out to browser.
    """
    return events.SSEResponse(
        settings.EVENTS_PUBSUB_URL,
        channels=[settings.EVENTS_PUBSUB_CHANNEL],
        buffer_key=settings.EVENTS_BUFFER_KEY,
        last_event_id=request.META.get('HTTP_LAST_EVENT_ID')
    )

そして、SSE をフォーマットするヘルパー関数と、ビューを少しきれいにする HTTPResponse サブクラスと共に、作業を行う Listener クラス:

class Listener(object):
    def __init__(self,
                 rcon_or_url=settings.EVENTS_PUBSUB_URL,
                 channels=None,
                 buffer_key=settings.EVENTS_BUFFER_KEY,
                 last_event_id=None):
        if isinstance(rcon_or_url, redis.StrictRedis):
            self.rcon = rcon_or_url
        elif isinstance(rcon_or_url, basestring):
            self.rcon = redis.StrictRedis(**utils.parse_redis_url(rcon_or_url))
        if channels is None:
            channels = [settings.EVENTS_PUBSUB_CHANNEL]
        self.channels = channels
        self.buffer_key = buffer_key
        self.last_event_id = last_event_id
        self.pubsub = self.rcon.pubsub()
        self.pubsub.subscribe(channels)

        # Send a superfluous message down the pubsub to flush out stale
        # connections.
        for channel in self.channels:
            # Use buffer_key=None since these pings never need to be remembered
            # and replayed.
            sender = Sender(self.rcon, channel, None)
            sender.publish('_flush', tags=['hidden'])

    def __iter__(self):
        # If we've been initted with a buffer key, then get all the events off
        # that and spew them out before blocking on the pubsub.
        if self.buffer_key:
            buffered_events = self.rcon.lrange(self.buffer_key, 0, -1)

            # check whether msg with last_event_id is still in buffer.  If so,
            # trim buffered_events to have only newer messages.
            if self.last_event_id:
                # Note that we're looping through most recent messages first,
                # here
                counter = 0
                for msg in buffered_events:
                    if (json.loads(msg)['id'] == self.last_event_id):
                        break
                    counter += 1
                buffered_events = buffered_events[:counter]

            for msg in reversed(list(buffered_events)):
                # Stream out oldest messages first
                yield to_sse({'data': msg})

        for msg in self.pubsub.listen():
            if msg['type'] == 'message':
                yield to_sse(msg)

    def close(self):
        self.pubsub.close()
        self.rcon.connection_pool.disconnect()


class SSEResponse(HttpResponse):
    def __init__(self, rcon_or_url, channels, buffer_key=None,
                 last_event_id=None, *args, **kwargs):
        self.listener = Listener(rcon_or_url, channels, buffer_key,
                                 last_event_id)
        super(SSEResponse, self).__init__(self.listener,
                                          mimetype='text/event-stream',
                                          *args, **kwargs)

    def close(self):
        """
        This will be called by the WSGI server at the end of the request, even
        if the client disconnects midstream.  Unless you're using Django's
        runserver, in which case you should expect to see Redis connections
        build up until http://bugs.python.org/issue16220 is fixed.
        """
        self.listener.close()


def to_sse(msg):
    """
    Given a Redis pubsub message that was published by a Sender (ie, has a JSON
    body with time, message, title, tags, and id), return a properly-formatted
    SSE string.
    """
    data = json.loads(msg['data'])

    # According to the SSE spec, lines beginning with a colon should be
    # ignored.  We can use that as a way to force zombie listeners to try
    # pushing something down the socket and clean up their redis connections
    # when they get an error.
    # See http://dev.w3.org/html5/eventsource/#event-stream-interpretation
    if data['message'] == '_flush':
        return ":\n"  # Administering colonic!

    if 'id' in data:
        out = "id: " + data['id'] + '\n'
    else:
        out = ''
    if 'name' in data:
        out += 'name: ' + data['name'] + '\n'

    payload = json.dumps({
        'time': data['time'],
        'message': data['message'],
        'tags': data['tags'],
        'title': data['title'],
    })
    out += 'data: ' + payload + '\n\n'
    return out
于 2012-10-15T02:14:46.507 に答える