4

C# を使用して、TCP 接続を介してデータ (ITCH データ、つまり外国為替価格) の継続的なストリームを読み取りますが、アプリケーションを長時間実行した後、アプリケーションがパケットをドロップして情報が失われることがあります。

以下は、データを読み取るために使用しているコード スニペットです。

private void ReaderThreadStarter()
    {
        StreamReader streamReader = new StreamReader(this._networkStream);
        while (!_stopping)
        {
            try
            {
                if (this._networkStream.DataAvailable)
                {
                    while ((line = streamReader.ReadLine()) != null)
                    {
                        lock (_queue.ConcurrentQueue)
                        {
                            byte[] data = System.Text.Encoding.ASCII.GetBytes(line);
                            Log.Info("Data Added in Queue: " + Encoding.ASCII.GetString(data, 0, data.Length));
                            _queue.WriteToQueue(data);
                        }
                    }
                }
            }
            catch (Exception exception)
            {
                Log.Error(exception);
            }
            finally
            {
                SetStopped();
            }
        }
    }

上記のコードが行うことは、TCP 接続からデータを読み取り、それを並行キューに書き込み、別のスレッドがキュー内のデータを使用して処理することです。したがって、基本的には単純な生産者と消費者の問題です。

キューに書き込んだものが消費者によって使用されるため、生産者と消費者の部分はうまく機能しているようです。

1 つのオプションは、スニファーを使用して、アプリケーションがパケットをドロップしていることを確認することでしたが、私はスニファーを使用できない環境で作業しています。パケット ドロップがあると私が考える理由は、一部の外国為替注文ではキャンセルがなく、価格が下がり、データ プロバイダーは最終的に価格が正しいと言うからです。

キューに保存する前にTCPポートから読み取ったデータもログに記録しているため、ログから、接続からの読み取りでデータが失われると想定しています。

ここで何が間違っているのか、またはパケットをドロップする理由を教えてください。

以下は、私の消費者コードのコード スニペットです。

public void ReadQueue()
    {
        try
        {
            while (true)
            {
                {
                    byte[] data = _queue.ReadFromQueue();

                    Parse(data);
                }
            }
        }
        catch (Exception exception)
        {
            Log.Error(exception);
        }
    }

public byte[] ReadFromQueue()
    {
        try
        {
            byte[] data;
            lock (this) // Enter synchronization block
            {
                ConcurrentQueue.TryDequeue(out data);
            } 
            return data;
        }
        catch (Exception exception)
        {
            Log.Error(exception);
            return null;
        }
    }
4

1 に答える 1

7

私の目を引くものが 2 つあります。最初は の使用ですDataAvailable。これを使用することは、事実上決して正しいことではありません。これが役立つ主な場合は、同期方法と非同期方法のどちらかを選択する場合です。たとえば、より多くのデータがインバウンドであるかどうかはわかりませが、「誤検知」が発生する可能性があり (意味のないものに使用しているため)、ループがすぐに終了する可能性があります。DataAvailableは、データがローカル バッファで現在使用可能かどうかのみを示します。それがすべてです

data2 番目に興味があるのは、バイナリかテキストかということです。あなたが使用しているという事実StreamReaderはテキストを示唆していますが、それから...なぜそれを再エンコードするのbyte[]ですか? 任意のバイナリの場合、テキストとして処理することはできません。これは機能しません。経由でフェッチするまでに、コンテンツStreamReaderすでに破損しています。テキストベースのプロトコルの場合は、再エンコードしないでください。文字列のキュー (または類似のもの) を使用してください。

無関係なメモとして...キューが本当に同時実行である場合、おそらくアクセスを同期する必要はありません。

于 2012-05-28T11:56:27.320 に答える