C# を使用して、TCP 接続を介してデータ (ITCH データ、つまり外国為替価格) の継続的なストリームを読み取りますが、アプリケーションを長時間実行した後、アプリケーションがパケットをドロップして情報が失われることがあります。
以下は、データを読み取るために使用しているコード スニペットです。
private void ReaderThreadStarter()
{
StreamReader streamReader = new StreamReader(this._networkStream);
while (!_stopping)
{
try
{
if (this._networkStream.DataAvailable)
{
while ((line = streamReader.ReadLine()) != null)
{
lock (_queue.ConcurrentQueue)
{
byte[] data = System.Text.Encoding.ASCII.GetBytes(line);
Log.Info("Data Added in Queue: " + Encoding.ASCII.GetString(data, 0, data.Length));
_queue.WriteToQueue(data);
}
}
}
}
catch (Exception exception)
{
Log.Error(exception);
}
finally
{
SetStopped();
}
}
}
上記のコードが行うことは、TCP 接続からデータを読み取り、それを並行キューに書き込み、別のスレッドがキュー内のデータを使用して処理することです。したがって、基本的には単純な生産者と消費者の問題です。
キューに書き込んだものが消費者によって使用されるため、生産者と消費者の部分はうまく機能しているようです。
1 つのオプションは、スニファーを使用して、アプリケーションがパケットをドロップしていることを確認することでしたが、私はスニファーを使用できない環境で作業しています。パケット ドロップがあると私が考える理由は、一部の外国為替注文ではキャンセルがなく、価格が下がり、データ プロバイダーは最終的に価格が正しいと言うからです。
キューに保存する前にTCPポートから読み取ったデータもログに記録しているため、ログから、接続からの読み取りでデータが失われると想定しています。
ここで何が間違っているのか、またはパケットをドロップする理由を教えてください。
以下は、私の消費者コードのコード スニペットです。
public void ReadQueue()
{
try
{
while (true)
{
{
byte[] data = _queue.ReadFromQueue();
Parse(data);
}
}
}
catch (Exception exception)
{
Log.Error(exception);
}
}
public byte[] ReadFromQueue()
{
try
{
byte[] data;
lock (this) // Enter synchronization block
{
ConcurrentQueue.TryDequeue(out data);
}
return data;
}
catch (Exception exception)
{
Log.Error(exception);
return null;
}
}