サンプル Django SSE ビュー:
class SSEView(View):
def dispatch(self, request):
response = http.StreamingHttpResponse(streaming_content=self.iterator(request=request), content_type="text/event-stream")
response['Cache-Control'] = 'no-cache'
response['connection'] = 'keep-alive'
return response
def iterator(self, request):
"""
Yield unseen events (based on session timestamp)
"""
user = request.user
while True:
user_ts = request.session.get("user_ts") # Last seen
sse = EVENT.find_one({
"ts": {"$gt" : user_ts}, # New event, after last-seen
"user_id": user._id}, # Only requester's event
})
if not sse:
time.sleep(random())
else:
yield sse
request.session["user_ts"] = sse["ts"] # Overwrite latest event time