4

私の他の質問に関連していますが、問題が解決することを期待して非同期を試しています。そうではありません。

シンプルな SOCKS5 サーバーを作成しようとしています。このプログラムをSOCKS5として使用するようにブラウザ(firefox)を設定しました。アイデアは、プログラムがプロキシサーバーに接続し、必要な情報を提供し、サーバーが単にある接続から別の接続へのデータの読み取り/書き込みを行うというものです。これは単にそれを行い、何も記録もフィルタリングもしません。非常にシンプルですが、CPUの問題と、数ページをヒットした後サイトに接続するのに数秒かかるという事実により、完全に使用できなくなります. 一体どうしてこれがそんなに多くの CPU を消費するのでしょうか? また、サイトへの接続に時間がかかるのはなぜですか? 非同期と同期の両方がこれに苦しんでいます

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Net.Sockets;
using System.Timers;
using System.IO;
using System.Net;
using System.Threading;

namespace ProxyTest
{
    class Program
    {
        static ManualResetEvent tcpClientConnected =new ManualResetEvent(false);
        static void Main(string[] args)
        {
            var s2 = new TcpListener(9998);
            s2.Start();
            Task.Run(() =>
            {
                while (true)
                {
                    tcpClientConnected.Reset();
                    s2.BeginAcceptTcpClient(Blah, s2);
                    tcpClientConnected.WaitOne();
                }
            });
            while (true)
                System.Threading.Thread.Sleep(10000000);
        }

        static void Blah(IAsyncResult ar)
        {
            try
            {
                Console.WriteLine("Connection");
                TcpListener listener = (TcpListener)ar.AsyncState;
                using (var socketin = listener.EndAcceptTcpClient(ar))
                {
                    tcpClientConnected.Set();
                    var ns1 = socketin.GetStream();
                    var r1 = new BinaryReader(ns1);
                    var w1 = new BinaryWriter(ns1);

                    if (false)
                    {
                        var s3 = new TcpClient();
                        s3.Connect("127.0.0.1", 9150);
                        var ns3 = s3.GetStream();
                        var r3 = new BinaryReader(ns3);
                        var w3 = new BinaryWriter(ns3);
                        while (true)
                        {
                            while (ns1.DataAvailable)
                            {
                                var b = ns1.ReadByte();
                                w3.Write((byte)b);
                                //Console.WriteLine("1: {0}", b);
                            }
                            while (ns3.DataAvailable)
                            {
                                var b = ns3.ReadByte();
                                w1.Write((byte)b);
                                Console.WriteLine("2: {0}", b);
                            }
                        }
                    }

                    {
                        if (!(r1.ReadByte() == 5 && r1.ReadByte() == 1))
                            return;
                        var c = r1.ReadByte();
                        for (int i = 0; i < c; ++i)
                            r1.ReadByte();
                        w1.Write((byte)5);
                        w1.Write((byte)0);
                    }
                    {
                        if (!(r1.ReadByte() == 5 && r1.ReadByte() == 1))
                            return;
                        if (r1.ReadByte() != 0)
                            return;
                    }
                    byte[] ipAddr = null;
                    string hostname = null;
                    var type = r1.ReadByte();
                    switch (type)
                    {
                        case 1:
                            ipAddr = r1.ReadBytes(4);
                            break;
                        case 3:
                            hostname = Encoding.ASCII.GetString(r1.ReadBytes(r1.ReadByte()));
                            break;
                        case 4:
                            throw new Exception();
                    }
                    var nhport = r1.ReadInt16();
                    var port = IPAddress.NetworkToHostOrder(nhport);

                    var socketout = new TcpClient();
                    if (hostname != null)
                        socketout.Connect(hostname, port);
                    else
                        socketout.Connect(new IPAddress(ipAddr), port);

                    w1.Write((byte)5);
                    w1.Write((byte)0);
                    w1.Write((byte)0);
                    w1.Write(type);
                    switch (type)
                    {
                        case 1:
                            w1.Write(ipAddr);
                            break;
                        case 2:
                            w1.Write((byte)hostname.Length);
                            w1.Write(Encoding.ASCII.GetBytes(hostname), 0, hostname.Length);
                            break;
                    }
                    w1.Write(nhport);

                    var buf1 = new byte[4096];
                    var buf2 = new byte[4096];
                    var ns2 = socketout.GetStream();
                    var r2 = new BinaryReader(ns2);
                    var w2 = new BinaryWriter(ns2);
                    Task.Run(() =>
                    {
                        var re = new ManualResetEvent(false);
                        while (true)
                        {
                            re.Reset();
                            ns1.BeginRead(buf1, 0, buf1.Length, ReadCallback, new A() { buf = buf1, thisSocket = socketin, otherSocket = socketout, thisStream = ns1, otherStream = ns2, re=re });
                            re.WaitOne();
                        }
                    });
                    Task.Run(() =>
                    {
                        var re = new ManualResetEvent(false);
                        while (true)
                        {
                            re.Reset();
                            ns2.BeginRead(buf2, 0, buf2.Length, ReadCallback, new A() { buf = buf2, thisSocket = socketout, otherSocket = socketin, thisStream = ns2, otherStream = ns1, re = re });
                            re.WaitOne();
                        }
                    });
                    while (true)
                    {
                        if (socketin.Connected == false)
                            return;
                        Thread.Sleep(100);
                    }
                }
            }
            catch { }
        }
        class A { public byte[] buf; public TcpClient thisSocket, otherSocket; public NetworkStream thisStream, otherStream; public ManualResetEvent re;};
        static void ReadCallback(IAsyncResult ar)
        {
            try
            {
                var a = (A)ar.AsyncState;
                var ns1 = a.thisStream;
                var len = ns1.EndRead(ar);
                a.otherStream.Write(a.buf, 0, len);
                a.re.Set();
            }
            catch
            {
            }
        }
    }
}
4

5 に答える 5

5

ここでいくつかのことが起こっています!

非同期呼び出しはすべて同期スタイルと呼ばれます。のように、操作を開始するスレッドは WaitOne を呼び出します - これは基本的に同期呼び出しと同等にするだけで、違いはありません。

睡眠ループは悪いです。 sleep(1) ループは迅速に応答しますが、CPU をいくらか使用します。sleep(1000) ループは応答が遅くなりますが、CPU の使用量は少なくなります。スリープ ループに多数のスレッドを配置しても CPU はあまり使用されませんが、スレッドの数が増え続けると、CPU の使用率が大幅に高くなります。最良の方法は、ポーリングの代わりに非同期呼び出しを使用することです。

loops を実行する多くのタスク。出口パスが保証されていないと、スレッド数が急増します。

ソケット A からソケット B にデータを転送している場合は、いずれかのソケットが閉じられたときに対応する必要があります。転送を停止し、保留中の書き込みが完了したことを確認して、ソケットを閉じます。

現在の実装では、1 つの転送タスクが閉じられた場合に両方の転送タスクが閉じられることを適切に保証できません。また、タスクを開始して手動リセット イベントでブロックする手法は、タスクがイベントを設定する前に例外を取得した場合に失敗する可能性があります。どちらの場合も、タスクは無限に実行されたままになります。

チェックSocket.Connectedは当然のことのように思えますが、実際には、これは最後の IO 操作で切断が発生したかどうかの単なるキャッシュです。切断の最初の通知である「ゼロ受信」に基づいて行動することを好みます。

NuGet 経由で PowerThreading を使用して、元の同期ルーチンのクイック非同期バージョンを作成しました (これは、フレームワーク 4.5 より前の非同期ルーチンを実行する方法です)。これはTcpListener、CPU 使用率がゼロで、スレッド数が非常に少ない場合に機能します。

これは、async/await を使用してバニラ c# で実行できます...方法はまだわかりません:)

using System;
using System.Collections.Generic;
using System.Text;

namespace AeProxy
{
    using System.IO;
    using System.Net;
    using System.Net.Sockets;
    using System.Threading;
    // Need to install Wintellect.Threading via NuGet for this:
    using Wintellect.Threading.AsyncProgModel;

    class Program
    {
        static void Main(string[] args)
        {
            var ae = new AsyncEnumerator() {SyncContext = null};
            var mainOp = ae.BeginExecute(ListenerFiber(ae), null, null);
            // block until main server is finished
            ae.EndExecute(mainOp);
        }

        static IEnumerator<int> ListenerFiber(AsyncEnumerator ae)
        {
            var listeningServer = new TcpListener(IPAddress.Loopback, 9998);
            listeningServer.Start();
            while (!ae.IsCanceled())
            {
                listeningServer.BeginAcceptTcpClient(ae.End(0, listeningServer.EndAcceptTcpClient), null);
                yield return 1;
                if (ae.IsCanceled()) yield break;
                var clientSocket = listeningServer.EndAcceptTcpClient(ae.DequeueAsyncResult());
                var clientAe = new AsyncEnumerator() { SyncContext = null };
                clientAe.BeginExecute(
                    ClientFiber(clientAe, clientSocket),
                    ar =>
                        {
                            try
                            {
                                clientAe.EndExecute(ar);
                            }
                            catch { }
                    }, null);
            }
        }

        static long clients = 0;

        static IEnumerator<int> ClientFiber(AsyncEnumerator ae, TcpClient clientSocket)
        {
            Console.WriteLine("ClientFibers ++{0}", Interlocked.Increment(ref clients));
            try
            {
                // original code to do handshaking and connect to remote host
                var ns1 = clientSocket.GetStream();
                var r1 = new BinaryReader(ns1);
                var w1 = new BinaryWriter(ns1);

                if (!(r1.ReadByte() == 5 && r1.ReadByte() == 1)) yield break;
                var c = r1.ReadByte();
                for (int i = 0; i < c; ++i) r1.ReadByte();
                w1.Write((byte)5);
                w1.Write((byte)0);

                if (!(r1.ReadByte() == 5 && r1.ReadByte() == 1)) yield break;
                if (r1.ReadByte() != 0) yield break;

                byte[] ipAddr = null;
                string hostname = null;
                var type = r1.ReadByte();
                switch (type)
                {
                    case 1:
                        ipAddr = r1.ReadBytes(4);
                        break;
                    case 3:
                        hostname = Encoding.ASCII.GetString(r1.ReadBytes(r1.ReadByte()));
                        break;
                    case 4:
                        throw new Exception();
                }
                var nhport = r1.ReadInt16();
                var port = IPAddress.NetworkToHostOrder(nhport);
                var socketout = new TcpClient();
                if (hostname != null) socketout.Connect(hostname, port);
                else socketout.Connect(new IPAddress(ipAddr), port);
                w1.Write((byte)5);
                w1.Write((byte)0);
                w1.Write((byte)0);
                w1.Write(type);
                switch (type)
                {
                    case 1:
                        w1.Write(ipAddr);
                        break;
                    case 3:
                        w1.Write((byte)hostname.Length);
                        w1.Write(Encoding.ASCII.GetBytes(hostname), 0, hostname.Length);
                        break;
                }
                w1.Write(nhport);
                using (var ns2 = socketout.GetStream())
                {
                    var forwardAe = new AsyncEnumerator() { SyncContext = null };
                    forwardAe.BeginExecute(
                        ForwardingFiber(forwardAe, ns1, ns2), ae.EndVoid(0, forwardAe.EndExecute), null);
                    yield return 1;
                    if (ae.IsCanceled()) yield break;
                    forwardAe.EndExecute(ae.DequeueAsyncResult());
                }
            }
            finally
            {
                Console.WriteLine("ClientFibers --{0}", Interlocked.Decrement(ref clients));
            }
        }

        private enum Operation { OutboundWrite, OutboundRead, InboundRead, InboundWrite } 

        const int bufsize = 4096;

        static IEnumerator<int> ForwardingFiber(AsyncEnumerator ae, NetworkStream inputStream, NetworkStream outputStream)
        {
            while (!ae.IsCanceled())
            {
                byte[] outputRead = new byte[bufsize], outputWrite = new byte[bufsize];
                byte[] inputRead = new byte[bufsize], inputWrite = new byte[bufsize];
                // start off output and input reads.
                // NB ObjectDisposedExceptions can be raised here when a socket is closed while an async read is in progress.
                outputStream.BeginRead(outputRead, 0, bufsize, ae.End(1, ar => outputStream.EndRead(ar)), Operation.OutboundRead);
                inputStream.BeginRead(inputRead, 0, bufsize, ae.End(1, ar => inputStream.EndRead(ar)), Operation.InboundRead);
                var pendingops = 2;
                while (!ae.IsCanceled())
                {
                    // wait for the next operation to complete, the state object passed to each async
                    // call can be used to find out what completed.
                    if (pendingops == 0) yield break;
                    yield return 1;
                    if (!ae.IsCanceled())
                    {
                        int byteCount;
                        var latestEvent = ae.DequeueAsyncResult();
                        var currentOp = (Operation)latestEvent.AsyncState;
                        if (currentOp == Operation.InboundRead)
                        {
                            byteCount = inputStream.EndRead(latestEvent);
                            if (byteCount == 0)
                            {
                                pendingops--;
                                outputStream.Close();
                                continue;
                            }
                            Array.Copy(inputRead, outputWrite, byteCount);
                            outputStream.BeginWrite(outputWrite, 0, byteCount, ae.EndVoid(1, outputStream.EndWrite), Operation.OutboundWrite);
                            inputStream.BeginRead(inputRead, 0, bufsize, ae.End(1, ar => inputStream.EndRead(ar)), Operation.InboundRead);
                        }
                        else if (currentOp == Operation.OutboundRead)
                        {
                            byteCount = outputStream.EndRead(latestEvent);
                            if (byteCount == 0)
                            {
                                pendingops--;
                                inputStream.Close();
                                continue;
                            }
                            Array.Copy(outputRead, inputWrite, byteCount);
                            inputStream.BeginWrite(inputWrite, 0, byteCount, ae.EndVoid(1, inputStream.EndWrite), Operation.InboundWrite);
                            outputStream.BeginRead(outputRead, 0, bufsize, ae.End(1, ar => outputStream.EndRead(ar)), Operation.OutboundRead);
                        }
                        else if (currentOp == Operation.InboundWrite)
                        {
                            inputStream.EndWrite(latestEvent);
                        }
                        else if (currentOp == Operation.OutboundWrite)
                        {
                            outputStream.EndWrite(latestEvent);
                        }
                    }
                }
            }
        }
    }
}
于 2013-07-30T14:22:10.517 に答える
0

スレッドを生成する代わりに、TcpClient メソッドの非同期バージョンを使用する必要があります。

于 2013-07-29T05:01:41.863 に答える