0

非常に単純な通知システムを作成しようとしています (SignalIR などを使用したくない)。次のテストコードがあります。

クライアント側:

var source = new EventSource('/notifications.axd');

source.onopen = function () {
    Console.log("Connection open");
};

source.onerror = function () {
    Console.log("Connection error");
};

source.onmessage = function (event) {
    Console.log("Message: " + event.data);
};

サーバ側:

public class NotificationMessage {
    public NotificationMessage() {
        Id = Guid.NewGuid().ToString();
    }

    public string Id { get; private set; }
}

public class NotificationsHandler : HttpTaskAsyncHandler {
    private const string CONTENT_TYPE = "text/event-stream";

    private sealed class NotificationItem {
        public ConcurrentQueue<NotificationMessage> Messages;
        public CancellationTokenSource CancellationTokenSource;
    }

    private static readonly ConcurrentDictionary<string, NotificationItem> _tasks =
        new ConcurrentDictionary<string, NotificationItem>();

    public static void Notify(string hostId, string userId, NotificationMessage message) {
        NotificationItem item;
        if (!_tasks.TryGetValue(string.Format("{0}|{1}", hostId, userId), out item)) {
            return;
        }

        var tokenSource = item.CancellationTokenSource;
        item.Messages.Enqueue(message);
        item.CancellationTokenSource = new CancellationTokenSource();
        tokenSource.Cancel();
    }

    public override async Task ProcessRequestAsync(HttpContext context) {
        HttpRequest request = context.Request;

        NotificationItem item = _tasks.GetOrAdd(
            string.Format("{0}|{1}", request.Url.Host, CsSession.Data.CurrentUser.Id), 
            k => new NotificationItem {
                Messages = new ConcurrentQueue<NotificationMessage>(),
                CancellationTokenSource = new CancellationTokenSource()
            }
        );

        HttpResponse response = context.Response;

        response.ContentType = CONTENT_TYPE;
        response.CacheControl = "no-cache";
        response.ContentEncoding = Encoding.UTF8;
        response.AppendHeader("connection", "keep-alive");

        response.BufferOutput = false;
        bool supportsAsyncFlush = response.SupportsAsyncFlush;
        bool shouldPing = true;

        while (response.IsClientConnected) {
            try {
                NotificationMessage message = null;
                if ((!item.Messages.IsEmpty && item.Messages.TryDequeue(out message)) || shouldPing) {
                    response.Write(string.Format("data:{0}\n\n", message == null ? "{}" : JsonMapper.Serialize(message)));

                    if (supportsAsyncFlush) {
                        await Task.Factory.FromAsync(response.BeginFlush, response.EndFlush, null);

                    } else {
                        response.Flush();
                    }
                } 

            } catch (Exception) {
                break;
            }

            var delay = Task.Delay(15000, item.CancellationTokenSource.Token);
            await delay;
            shouldPing = delay.Status == TaskStatus.RanToCompletion;
        }
    }
}

問題は、上記が機能しないことです。2 つの問題があります。

1) クライアントが接続すると、空のパケットを受信します (問題ありません)。次に、メッセージをエンキューしない場合、Task.Delay を待った後、ループは再び空のメッセージを書き込もうとしますが、どこにあるのかわかりません。response.Write 行は返されません (クライアントでは何も受信されません)。

2) キューに書き込むと、何らかの理由で接続が切断されます。await 遅延の後に行にブレークポイントを設定すると、その行は実行されません (私のロジックではそうではありません:))。トークンをキャンセルすると、遅延タスクは終了するはずですが、ハンドラー全体を中止しているようです??

4

0 に答える 0