8

マルチスレッドTCPサーバーでインターロックされたMonitor.WaitとMonitor.Pulseに問題があります。私の問題を示すために、ここに私のサーバーコードがあります:

public class Server
{
    TcpListener listener;
    Object sync;
    IHandler handler;
    bool running;

    public Server(IHandler handler, int port)
    {
        this.handler = handler;
        IPAddress address = Dns.GetHostEntry(Dns.GetHostName()).AddressList[0];
        listener = new TcpListener(address, port);
        sync = new Object();
        running = false;
    }

    public void Start()
    {
        Thread thread = new Thread(ThreadStart);
        thread.Start();
    }

    public void Stop()
    {
        lock (sync)
        {
            listener.Stop();
            running = false;
            Monitor.Pulse(sync);
        }
    }

    void ThreadStart()
    {
        if (!running)
        {
            listener.Start();
            running = true;
            lock (sync)
            {
                while (running)
                {
                    try
                    {
                        listener.BeginAcceptTcpClient(new AsyncCallback(Accept), listener);
                        Monitor.Wait(sync);  // Release lock and wait for a pulse
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine(e.Message);
                    }
                }
            }
        }
    }

    void Accept(IAsyncResult result)
    {
        // Let the server continue listening
        lock (sync)
        {
            Monitor.Pulse(sync);
        } 

        if (running)
        {
            TcpListener listener = (TcpListener)result.AsyncState;
            using (TcpClient client = listener.EndAcceptTcpClient(result))
            {
                handler.Handle(client.GetStream());
            }
        }
    }
}

そして、これが私のクライアントコードです:

class Client
{
    class EchoHandler : IHandler
    {
        public void Handle(Stream stream)
        {
            System.Console.Out.Write("Echo Handler: ");
            StringBuilder sb = new StringBuilder();
            byte[] buffer = new byte[1024];
            int count = 0;
            while ((count = stream.Read(buffer, 0, 1024)) > 0)
            {
                sb.Append(Encoding.ASCII.GetString(buffer, 0, count));
            }
            System.Console.Out.WriteLine(sb.ToString());
            System.Console.Out.Flush();
        }
    }

    static IPAddress localhost = Dns.GetHostEntry(Dns.GetHostName()).AddressList[0];

    public static int Main()
    {
        Server server1 = new Server(new EchoHandler(), 1000);
        Server server2 = new Server(new EchoHandler(), 1001);

        server1.Start();
        server2.Start();

        Console.WriteLine("Press return to test...");
        Console.ReadLine();

        // Note interleaved ports
        SendMsg("Test1", 1000);
        SendMsg("Test2", 1001);
        SendMsg("Test3", 1000);
        SendMsg("Test4", 1001);
        SendMsg("Test5", 1000);
        SendMsg("Test6", 1001);
        SendMsg("Test7", 1000);

        Console.WriteLine("Press return to terminate...");
        Console.ReadLine();

        server1.Stop();
        server2.Stop();

        return 0;
    }

    public static void SendMsg(String msg, int port)
    {
        IPEndPoint endPoint = new IPEndPoint(localhost, port);

        byte[] buffer = Encoding.ASCII.GetBytes(msg);
        using (Socket s = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp))
        {
            s.Connect(endPoint);
            s.Send(buffer);
        }
    }
}

クライアントは7つのメッセージを送信しますが、サーバーは4つしか出力しません。

Returnキーを押してテストします...

Returnキーを押して終了します...
エコーハンドラー:Test1
エコーハンドラー:Test3
エコーハンドラー:Test2
エコーハンドラー:Test4

が呼び出されるまでオブジェクトをロックしているはずなのに、(サーバーのメソッドで)発生する前に(Pulseサーバーのメソッドで)発生することを許可することでモニターが混乱しているのではないかと思います。そうすれば、メソッドはロックを取得できます。そしてそのを送信します。サーバーのメソッドでこれらの2行をコメントアウトする場合:AcceptWaitThreadStartThreadStartsyncMonitor.Wait()AcceptPulseStop()

//listener.Stop();
//running = false;

残りのメッセージは、サーバーのStop()メソッドが呼び出されたときに表示されます(つまり、サーバーのオブジェクトをウェイクアップするsyncと、残りの着信メッセージがディスパッチされます)。ThreadStartこれはとメソッド間の競合状態でのみ発生するように思われますが、オブジェクトAcceptの周囲をロックすることでこれを防ぐことができます。sync

何か案は?

サイモン、どうもありがとう。

ps。出力が乱れているなどのことを認識していることに注意してください。特に、ロックとモニターの間の競合状態について質問しています。乾杯、SH。

4

1 に答える 1

5

問題は、Pulse/Wait をシグナルとして使用していることです。AutoResetEvent などの適切なシグナルには、スレッドが WaitOne() を呼び出すまでシグナル状態が維持されるような状態があります。スレッドを待機させずに Pulse を呼び出すと、ノープになります。

これは、ロックが同じスレッドによって何度も取得される可能性があるという事実と組み合わされています。非同期プログラミングを使用しているため、Accept コールバックは、BeginAcceptTcpClient を実行した同じスレッドから呼び出すことができます。

説明しましょう。2 番目のサーバーをコメントアウトし、サーバーのコードをいくつか変更しました。

void ThreadStart()
{
    if (!running)
    {
        listener.Start();
        running = true;
        lock (sync)
        {
            while (running)
            {
                try
                {
                    Console.WriteLine("BeginAccept [{0}]", 
                        Thread.CurrentThread.ManagedThreadId);
                    listener.BeginAcceptTcpClient(new AsyncCallback(Accept), listener);
                    Console.WriteLine("Wait [{0}]", 
                        Thread.CurrentThread.ManagedThreadId);
                    Monitor.Wait(sync);  // Release lock and wait for a pulse
                }
                catch (Exception e)
                {
                    Console.WriteLine(e.Message);
                }
            }
        }
    }
}

void Accept(IAsyncResult result)
{
    // Let the server continue listening
    lock (sync)
    {
        Console.WriteLine("Pulse [{0}]", 
            Thread.CurrentThread.ManagedThreadId);
        Monitor.Pulse(sync);
    }
    if (running)
    {
        TcpListener localListener = (TcpListener)result.AsyncState;
        using (TcpClient client = localListener.EndAcceptTcpClient(result))
        {
            handler.Handle(client.GetStream());
        }
    }
}

私の実行からの出力を以下に示します。このコードを自分で実行すると、値は異なりますが、一般的には同じになります。

Press return to test...
BeginAccept [3]
Wait [3]

Press return to terminate...
Pulse [5]
BeginAccept [3]
Pulse [3]
Echo Handler: Test1
Echo Handler: Test3
Wait [3]

ご覧のとおり、2 つの Pulse が呼び出されています。1 つは別のスレッド (Pulse [5]) からのもので、最初の Wait を起動します。スレッド 3 は別の BeginAccept を実行しますが、Pending の着信接続があるため、そのスレッドは Accept コールバックをすぐに呼び出すことを決定します。Accept は同じスレッドによって呼び出されるため、Lock(sync) はブロックされませんが、Pulse [3] は空のスレッド キューですぐに実行されます。

2 つのハンドラーが呼び出され、2 つのメッセージを処理します。

すべて問題なく、ThreadStart が再び実行を開始し、無期限に待機します。

ここでの根本的な問題は、モニターを信号として使用しようとしているということです。状態を記憶していないため、2 番目の Pulse は失われます。

しかし、これには簡単な解決策があります。適切な信号である AutoResetEvents を使用すると、その状態が記憶されます。

public Server(IHandler handler, int port)
{
    this.handler = handler;
    IPAddress address = Dns.GetHostEntry(Dns.GetHostName()).AddressList[0];
    listener = new TcpListener(address, port);
    running = false;
    _event = new AutoResetEvent(false);
}

public void Start()
{
    Thread thread = new Thread(ThreadStart);
    thread.Start();
}

public void Stop()
{
    listener.Stop();
    running = false;
    _event.Set();
}

void ThreadStart()
{
    if (!running)
    {
        listener.Start();
        running = true;
        while (running)
        {
            try
            {
                listener.BeginAcceptTcpClient(new AsyncCallback(Accept), listener);
                _event.WaitOne();
            }
            catch (Exception e)
            {
                Console.WriteLine(e.Message);
            }
        }
    }
}

void Accept(IAsyncResult result)
{
    // Let the server continue listening
    _event.Set();
    if (running)
    {
        TcpListener localListener = (TcpListener) result.AsyncState;
        using (TcpClient client = localListener.EndAcceptTcpClient(result))
        {
            handler.Handle(client.GetStream());
        }
    }
}
于 2009-05-12T10:13:19.920 に答える