TCP サーバーは SocketAsyncEventArgs を使用して開発され、Windows サービスとしての非同期メソッドです。Main の先頭に次の 2 行のコードがあります。
ThreadPool.SetMaxThreads(15000, 30000);
ThreadPool.SetMinThreads(10000, 20000);
どちらも true を返します (戻り値はログに記録されます)。現在、2000 から 3000 のクライアントがこのサーバーにメッセージを送信し始め、接続を受け入れ始めます (接続の数を数えてみると、予想どおりです - 接続プールがあります)。サーバー プロセスのスレッド数は ~2050 から ~3050 に増加します。ここまでは順調ですね!
これで、ReceiveAsync が true を返した後、または SocketAsyncEventArgs の Completed イベントによって呼び出される Received メソッドがあります。
そして、ここから問題が始まります。どれだけ多くのクライアントが接続され、どれだけ多くのメッセージが送信されても、Received は 1 秒間に最大 20 回呼び出されます! クライアントの数が増えると、この数 (20) は最大 10 に減少します。
環境: TCP サーバーとクライアントは、同じマシン上でシミュレートされています。2 台のマシンでコードをテストしました。1 台には 2 コア CPU と 4GB RAM が搭載され、もう 1 台には 8 コア CPU と 12GB RAM が搭載されています。(まだ) データの損失はなく、各受信操作で複数のメッセージを受信することがあります。それはいいです。しかし、受信操作の数を増やすにはどうすればよいでしょうか。
実装に関する追加の注意事項: コードは大きく、さまざまなロジックが含まれています。全体的な説明は次のとおりです。新しい接続を受け入れるための単一の SocketAsyncEventArgs があります。それはうまくいきます。ここで、新しく受け入れられた接続ごとに、データを受信するための新しい SocketAsyncEventArgs を作成します。これ (受信用に作成された SocketAsyncEventArgs) をプールに入れました。再利用されることはありませんが、UserToken は接続の追跡に使用されています。たとえば、切断された接続や 7 分間データを送信していない接続は閉じられて破棄されます (SocketAsyncEventArgs の AcceptSocket はシャットダウンされ (両方)、閉じられて破棄され、SocketAsyncEventArgs オブジェクト自体も同様に破棄されます)。
class Sudo
{
Socket _listener;
int _port = 8797;
public Sudo()
{
var ipEndPoint = new IPEndPoint(IPAddress.Any, _port);
_listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
_listener.Bind(ipEndPoint);
_listener.Listen(100);
Accept(null);
}
void Accept(SocketAsyncEventArgs acceptEventArg)
{
if (acceptEventArg == null)
{
acceptEventArg = new SocketAsyncEventArgs();
acceptEventArg.Completed += AcceptCompleted;
}
else acceptEventArg.AcceptSocket = null;
bool willRaiseEvent = _listener.AcceptAsync(acceptEventArg); ;
if (!willRaiseEvent) Accepted(acceptEventArg);
}
void AcceptCompleted(object sender, SocketAsyncEventArgs e)
{
Accepted(e);
}
void Accepted(SocketAsyncEventArgs e)
{
var acceptSocket = e.AcceptSocket;
var readEventArgs = CreateArg(acceptSocket);
var willRaiseEvent = acceptSocket.ReceiveAsync(readEventArgs);
Accept(e);
if (!willRaiseEvent) Received(readEventArgs);
}
SocketAsyncEventArgs CreateArg(Socket acceptSocket)
{
var arg = new SocketAsyncEventArgs();
arg.Completed += IOCompleted;
var buffer = new byte[64 * 1024];
arg.SetBuffer(buffer, 0, buffer.Length);
arg.AcceptSocket = acceptSocket;
arg.SocketFlags = SocketFlags.None;
return arg;
}
void IOCompleted(object sender, SocketAsyncEventArgs e)
{
switch (e.LastOperation)
{
case SocketAsyncOperation.Receive:
Received(e);
break;
default: break;
}
}
void Received(SocketAsyncEventArgs e)
{
if (e.SocketError != SocketError.Success || e.BytesTransferred == 0 || e.Buffer == null || e.Buffer.Length == 0)
{
// Kill(e);
return;
}
var bytesList = new List<byte>();
for (var i = 0; i < e.BytesTransferred; i++) bytesList.Add(e.Buffer[i]);
var bytes = bytesList.ToArray();
Process(bytes);
ReceiveRest(e);
Perf.IncOp();
}
void ReceiveRest(SocketAsyncEventArgs e)
{
e.SocketFlags = SocketFlags.None;
for (int i = 0; i < e.Buffer.Length; i++) e.Buffer[i] = 0;
e.SetBuffer(0, e.Buffer.Length);
var willRaiseEvent = e.AcceptSocket.ReceiveAsync(e);
if (!willRaiseEvent) Received(e);
}
void Process(byte[] bytes) { }
}