4

同様の問題または同様の問題を経験している人を見つけるのに苦労しています。

私は現在、GZip 要件がある http (json) 経由のストリームを消費しています。データが送信されてからreader.ReadLine()読み取られるまでに遅延が発生しています。これは、バッファにデータを保持するデコードに関連している可能性があることが示唆されていますか?

これは私が現在持っているもので、遅延を除けば問題なく動作します。

HttpWebRequest request = (HttpWebRequest)WebRequest.Create(endPoint);
request.Method = "GET";

request.PreAuthenticate = true;
request.Credentials = new NetworkCredential(username, password);

request.AutomaticDecompression = DecompressionMethods.GZip;
request.ContentType = "application/json";
request.Accept = "application/json";
request.Timeout = 30;
request.BeginGetResponse(AsyncCallback, request);

次に、 AsyncCallback メソッド内に次のものがあります。

HttpWebRequest request = result.AsyncState as HttpWebRequest;

using (HttpWebResponse response = (HttpWebResponse)request.EndGetResponse(result))
using (Stream stream = response.GetResponseStream())
using (StreamReader reader = new StreamReader(stream, Encoding.UTF8))
{

    while (!reader.EndOfStream)
    {
        string line = reader.ReadLine();
        if (string.IsNullOrWhiteSpace(line)) continue;

        Console.WriteLine(line);
    }
}

reader.Readline()より多くのデータが受信されるまで待機し、その一部を保留します。キープアライブ改行も受信されます。これらは、何かを読み取ることを決定したときに一度に読み取られることがよくあります。

curl コマンドを実行してストリームを並行して実行することをテストしました。curl コマンドはデータを完全に正常に受信して解凍します。

どんな洞察も素晴らしいでしょう。ありがとう、

ダン

編集 streamreader でバッファ サイズを使用してもうまくいきませんでした。

new StreamReader(stream, Encoding.UTF8, true, 1)

EDIT また、.NET 4.5に更新して使用することもできませんでした

request.AllowReadStreamBuffering = false;
4

3 に答える 3

5

更新:これは、ボリュームのレートが高い場合に長時間にわたって問題があるようであり、バッファがアプリケーションの機能に影響を与える小さなボリュームでのみ使用する必要があります。それ以来、私はに戻ってきましたStreamReader

それで、これが私が思いついたものです。これは遅延なく機能します。これは、自動化された GZip 解凍によってバッファリングされません。

using (HttpWebResponse response = (HttpWebResponse)request.EndGetResponse(result))
using (Stream stream = response.GetResponseStream())
using (MemoryStream memory = new MemoryStream())
using (GZipStream gzip = new GZipStream(memory, CompressionMode.Decompress))
{
    byte[] compressedBuffer = new byte[8192];
    byte[] uncompressedBuffer = new byte[8192];
    List<byte> output = new List<byte>();

    while (stream.CanRead)
    {
        int readCount = stream.Read(compressedBuffer, 0, compressedBuffer.Length);

        memory.Write(compressedBuffer.Take(readCount).ToArray(), 0, readCount);
        memory.Position = 0;

        int uncompressedLength = gzip.Read(uncompressedBuffer, 0, uncompressedBuffer.Length);

        output.AddRange(uncompressedBuffer.Take(uncompressedLength));

        if (!output.Contains(0x0A)) continue;

        byte[] bytesToDecode = output.Take(output.LastIndexOf(0x0A) + 1).ToArray();
        string outputString = Encoding.UTF8.GetString(bytesToDecode);
        output.RemoveRange(0, bytesToDecode.Length);

        string[] lines = outputString.Split(new[] { Environment.NewLine }, new StringSplitOptions());
        for (int i = 0; i < (lines.Length - 1); i++)
        {
            Console.WriteLine(lines[i]);
        }

        memory.SetLength(0);
    }
}
于 2013-02-08T17:31:49.943 に答える
1

C.Evenhuis が議論している Delayed ACK には何かがあるかもしれませんが、私はそれStreamReaderが頭痛の原因であると直感的に感じています...次のようなことを試してみてください:

public void AsyncCallback(IAsyncResult result)
{
    HttpWebRequest request = result.AsyncState as HttpWebRequest;   
    using (HttpWebResponse response = (HttpWebResponse)request.EndGetResponse(result))
    using (Stream stream = response.GetResponseStream())
    {
        var buffer = new byte[2048];
        while(stream.CanRead)
        {
            var readCount = stream.Read(buffer, 0, buffer.Length);
            var line = Encoding.UTF8.GetString(buffer.Take(readCount).ToArray());
            Console.WriteLine(line);
        }
    }
}

編集:これが私がこの理論をテストするために使用した完全なハーネスです(おそらくあなたの状況との違いがあなたに飛び出すでしょう)

(LINQPad 対応)

void Main()
{
    Task.Factory.StartNew(() => Listener());
    _blocker.WaitOne();
    Request();
}

public bool _running;
public ManualResetEvent _blocker = new ManualResetEvent(false);

public void Listener()
{
    var listener = new HttpListener();
    listener.Prefixes.Add("http://localhost:8080/");
    listener.Start();
    "Listener is listening...".Dump();;
    _running = true;
    _blocker.Set();
    var ctx = listener.GetContext();
    "Listener got context".Dump();
    ctx.Response.KeepAlive = true;
    ctx.Response.ContentType = "application/json";
    var outputStream = ctx.Response.OutputStream;
    using(var zipStream = new GZipStream(outputStream, CompressionMode.Compress))
    using(var writer = new StreamWriter(outputStream))
    {
        var lineCount = 0;
        while(_running && lineCount++ < 10)
        {
            writer.WriteLine("{ \"foo\": \"bar\"}");
            "Listener wrote line, taking a nap...".Dump();
            writer.Flush();
            Thread.Sleep(1000);
        }
    }
    listener.Stop();
}

public void Request()
{
    var endPoint = "http://localhost:8080";
    var username = "";
    var password = "";
    HttpWebRequest request = (HttpWebRequest)WebRequest.Create(endPoint);
    request.Method = "GET";

    request.PreAuthenticate = true;
    request.Credentials = new NetworkCredential(username, password);

    request.AutomaticDecompression = DecompressionMethods.GZip;
    request.ContentType = "application/json";
    request.Accept = "application/json";
    request.Timeout = 30;
    request.BeginGetResponse(AsyncCallback, request);
}

public void AsyncCallback(IAsyncResult result)
{
    Console.WriteLine("In AsyncCallback");    
    HttpWebRequest request = result.AsyncState as HttpWebRequest;    
    using (HttpWebResponse response = (HttpWebResponse)request.EndGetResponse(result))
    using (Stream stream = response.GetResponseStream())
    {
        while(stream.CanRead)
        {
            var buffer = new byte[2048];
            var readCount = stream.Read(buffer, 0, buffer.Length);
            var line = Encoding.UTF8.GetString(buffer.Take(readCount).ToArray());
            Console.WriteLine("Reader got:" + line);
        }
    }
}

出力:

Listener is listening...
Listener got context
Listener wrote line, taking a nap...
In AsyncCallback
Reader got:{ "foo": "bar"}

Listener wrote line, taking a nap...
Reader got:{ "foo": "bar"}

Listener wrote line, taking a nap...
Reader got:{ "foo": "bar"}

Listener wrote line, taking a nap...
Reader got:{ "foo": "bar"}

Listener wrote line, taking a nap...
Reader got:{ "foo": "bar"}

Listener wrote line, taking a nap...
Reader got:{ "foo": "bar"}
于 2013-02-07T21:08:11.920 に答える
0

これは、Nagle のアルゴリズムと組み合わせた遅延 ACKに関係している可能性があります。サーバーが複数の小さな応答を続けて送信すると発生します。

サーバー側では、最初の応答が送信されますが、後続の応答データ チャンクは、サーバーがクライアントから ACK を受信した場合、または大きなパケットを送信するのに十分なデータが得られるまで送信されます (Nagle のアルゴリズム)。

クライアント側では、応答の最初のビットが受信されますが、ACK はすぐには送信されません。従来のアプリケーションには要求-応答-要求-応答の動作があるため、次の要求と共に ACK を送信できると想定されます。あなたのケースは起こりません。

一定時間 (500 ミリ秒?) が経過した後、とにかく ACK を送信することを決定し、サーバーが蓄積した次のパッケージを送信するようにします。

NoDelay問題 (これが実際に発生している問題である場合) は、プロパティを設定して Nagle のアルゴリズムを無効にすることにより、サーバー側でソケット レベルで修正できます。オペレーティングシステム全体で無効にすることもできると思います。

クライアント側で遅延ACKを一時的に無効にすることもできます(Windowsにはレジストリエントリがあることがわかっています)。サーバーで何も変更することなく、これが実際に問題であるかどうかを確認できます。遅延 ACK は DDOS 攻撃を防ぐため、後で必ず設定を復元してください。

キープアライブの送信頻度を下げることも有効ですが、それでも問題が発生する可能性はあります。

于 2013-02-07T20:53:14.823 に答える