非常に単純な通知システムを作成しようとしています (SignalIR などを使用したくない)。次のテストコードがあります。
クライアント側:
var source = new EventSource('/notifications.axd');
source.onopen = function () {
Console.log("Connection open");
};
source.onerror = function () {
Console.log("Connection error");
};
source.onmessage = function (event) {
Console.log("Message: " + event.data);
};
サーバ側:
public class NotificationMessage {
public NotificationMessage() {
Id = Guid.NewGuid().ToString();
}
public string Id { get; private set; }
}
public class NotificationsHandler : HttpTaskAsyncHandler {
private const string CONTENT_TYPE = "text/event-stream";
private sealed class NotificationItem {
public ConcurrentQueue<NotificationMessage> Messages;
public CancellationTokenSource CancellationTokenSource;
}
private static readonly ConcurrentDictionary<string, NotificationItem> _tasks =
new ConcurrentDictionary<string, NotificationItem>();
public static void Notify(string hostId, string userId, NotificationMessage message) {
NotificationItem item;
if (!_tasks.TryGetValue(string.Format("{0}|{1}", hostId, userId), out item)) {
return;
}
var tokenSource = item.CancellationTokenSource;
item.Messages.Enqueue(message);
item.CancellationTokenSource = new CancellationTokenSource();
tokenSource.Cancel();
}
public override async Task ProcessRequestAsync(HttpContext context) {
HttpRequest request = context.Request;
NotificationItem item = _tasks.GetOrAdd(
string.Format("{0}|{1}", request.Url.Host, CsSession.Data.CurrentUser.Id),
k => new NotificationItem {
Messages = new ConcurrentQueue<NotificationMessage>(),
CancellationTokenSource = new CancellationTokenSource()
}
);
HttpResponse response = context.Response;
response.ContentType = CONTENT_TYPE;
response.CacheControl = "no-cache";
response.ContentEncoding = Encoding.UTF8;
response.AppendHeader("connection", "keep-alive");
response.BufferOutput = false;
bool supportsAsyncFlush = response.SupportsAsyncFlush;
bool shouldPing = true;
while (response.IsClientConnected) {
try {
NotificationMessage message = null;
if ((!item.Messages.IsEmpty && item.Messages.TryDequeue(out message)) || shouldPing) {
response.Write(string.Format("data:{0}\n\n", message == null ? "{}" : JsonMapper.Serialize(message)));
if (supportsAsyncFlush) {
await Task.Factory.FromAsync(response.BeginFlush, response.EndFlush, null);
} else {
response.Flush();
}
}
} catch (Exception) {
break;
}
var delay = Task.Delay(15000, item.CancellationTokenSource.Token);
await delay;
shouldPing = delay.Status == TaskStatus.RanToCompletion;
}
}
}
問題は、上記が機能しないことです。2 つの問題があります。
1) クライアントが接続すると、空のパケットを受信します (問題ありません)。次に、メッセージをエンキューしない場合、Task.Delay を待った後、ループは再び空のメッセージを書き込もうとしますが、どこにあるのかわかりません。response.Write 行は返されません (クライアントでは何も受信されません)。
2) キューに書き込むと、何らかの理由で接続が切断されます。await 遅延の後に行にブレークポイントを設定すると、その行は実行されません (私のロジックではそうではありません:))。トークンをキャンセルすると、遅延タスクは終了するはずですが、ハンドラー全体を中止しているようです??