Django アプリにServer Sent Event APIを実装して、リアルタイムの更新をバックエンドからブラウザーにストリーミングしました。バックエンドは Redis pubsub です。私の Django ビューは次のようになります。
def event_stream(request):
"""
Stream worker events out to browser.
"""
listener = events.Listener(
settings.EVENTS_PUBSUB_URL,
channels=[settings.EVENTS_PUBSUB_CHANNEL],
buffer_key=settings.EVENTS_BUFFER_KEY,
last_event_id=request.META.get('HTTP_LAST_EVENT_ID')
)
return http.HttpResponse(listener, mimetype='text/event-stream')
そして、イテレータとして返す events.Listener クラスは次のようになります。
class Listener(object):
def __init__(self, rcon_or_url, channels, buffer_key=None,
last_event_id=None):
if isinstance(rcon_or_url, redis.StrictRedis):
self.rcon = rcon_or_url
elif isinstance(rcon_or_url, basestring):
self.rcon = redis.StrictRedis(**utils.parse_redis_url(rcon_or_url))
self.channels = channels
self.buffer_key = buffer_key
self.last_event_id = last_event_id
self.pubsub = self.rcon.pubsub()
self.pubsub.subscribe(channels)
def __iter__(self):
# If we've been initted with a buffer key, then get all the events off
# that and spew them out before blocking on the pubsub.
if self.buffer_key:
buffered_events = self.rcon.lrange(self.buffer_key, 0, -1)
# check whether msg with last_event_id is still in buffer. If so,
# trim buffered_events to have only newer messages.
if self.last_event_id:
# Note that we're looping through most recent messages first,
# here
counter = 0
for msg in buffered_events:
if (json.loads(msg)['id'] == self.last_event_id):
break
counter += 1
buffered_events = buffered_events[:counter]
for msg in reversed(list(buffered_events)):
# Stream out oldest messages first
yield to_sse({'data': msg})
try:
for msg in self.pubsub.listen():
if msg['type'] == 'message':
yield to_sse(msg)
finally:
logging.info('Closing pubsub')
self.pubsub.close()
self.rcon.connection_pool.disconnect()
このセットアップで、イベントをブラウザーに正常にストリーミングできます。ただし、リスナーの「最終的に」の切断呼び出しは、実際には呼び出されないようです。pubsub からメッセージが来るのを待って、彼らはまだ野宿していると思います。クライアントが切断して再接続すると、Redis インスタンスへの接続数が増加し、減少することはありません。約 1000 になると、Redis は異常を起こし始め、利用可能なすべての CPU を消費します。
クライアントがリッスンしなくなったことを検出し、その時点で Redis 接続を閉じられるようにしたいと考えています。
私が試したり考えたりしたこと:
- 接続プール。しかし、redis-py README が述べているように、「PubSub または Pipeline オブジェクトをスレッド間で渡すことは安全ではありません。」
- 接続、または単に切断を処理するミドルウェア。ミドルウェアの process_response() メソッドがあまりにも早く (http ヘッダーがクライアントに送信される前に) 呼び出されるため、これは機能しません。コンテンツをストリーミングしている最中にクライアントが切断されたときに呼び出されるものが必要です。
- request_finishedおよびgot_request_exceptionシグナル。最初のものは、ミドルウェアの process_response() のように、起動が早すぎるようです。2 番目は、クライアントがストリームの途中で切断されたときに呼び出されません。
最終的なしわ: 本番環境では Gevent を使用しているため、一度に多くの接続を開いたままにしておくことができます。ただし、この接続リークの問題は、単純な古い「manage.py runserver」、Gevent のモンキーパッチを適用した runserver、または Gunicorn の gevent ワーカーを使用している場合に発生します。