5

そのため、大規模なクライアント/サーバー システムを実装するための代替案についていくつかの調査を開始しています。現在、アプリケーション フレームワークの多くに Poco を使用しているため、Poco の Reactor フレームワークを検討しています。

着信パケットのサイズはかなり小さくなるので、クライアントからデータを読み取るという観点からは問題なく機能すると思います。ただし、クライアント入力に基づいて実行される操作は比較的コストがかかり、別のプロセスまたは別のサーバーにオフロードする必要がある場合があります。また、クライアントに返される応答は、かなり大きい場合があります。したがって、明らかに、それが行われている間、リアクター スレッドをブロックすることはできません。

したがって、リアクター イベント ハンドラーでデータを読み取ってから、データを処理する別のスレッド (プール) に渡すと、うまくいくと思います。

私がよくわからないのは、操作が完了したときにクライアントに応答を返すプロセスです。

フレームワークの最適な使用方法に関する情報があまり見つかりません。しかし、いくつかのテストを行ったところ、ソケットが書き込み可能である間、リアクターが WritableNotification イベントを繰り返し発生させるようです。最適なプロセスは、WritableNotification イベントを受信するオブジェクトで送信する必要があるデータをキューに入れ、イベントを受信するたびに小さなチャンクを送信することでしょうか?

更新: したがって、これをテストし始めたとき、サーバーの CPU 使用率が、サーバー アプリが単一の接続で実行されている CPU で 100% に達したことを発見してぞっとしました。しかし、掘り下げた後、私は自分が間違っていたことを発見しました。サービス ハンドラの作成時に WritableNotification イベントに登録する必要がないことを発見しました。登録する必要があるのは、送信するデータがある場合のみです。次に、すべてのデータが送信されたら、イベント ハンドラーの登録を解除する必要があります。このようにして、リアクターは、送信するものが何もないときに、イベント ハンドラーを何度も呼び出し続ける必要がありません。現在、100 接続でも CPU 使用率はほぼ 0 のままです。うわー!

4

1 に答える 1

4

SocketConnector からコピーした ServerConnector クラスを作成しましたが、ソケットの接続を呼び出さないでください。ソケットは既に接続されているためです。TcpServerConnection の run() 関数で通知用の ServiceHandler を使用してリアクターが開始された場合、TcpServer クラスは新しいスレッドを開始します。だから、私はリアクターパートテンのマルチスレッドを手に入れましたが、それが最善の方法であるかどうかはわかりません。

クラス ServerConnector

template <class ServiceHandler>
class ServerConnector
{
public:     
    explicit ServerConnector(StreamSocket& ss):
        _pReactor(0),
        _socket(ss)
        /// Creates a ServerConnector, using the given Socket.
    {
    }

    ServerConnector(StreamSocket& ss, SocketReactor& reactor):
        _pReactor(0),
        _socket(ss)
        /// Creates an acceptor, using the given ServerSocket.
        /// The ServerConnector registers itself with the given SocketReactor.
    {
        registerConnector(reactor);
        onConnect();
    }

    virtual ~ServerConnector()
        /// Destroys the ServerConnector.
    {
        unregisterConnector();
    }

//
// this part is same with SocketConnector
//

private:
    ServerConnector();
    ServerConnector(const ServerConnector&);
    ServerConnector& operator = (const ServerConnector&);

    StreamSocket&   _socket;
    SocketReactor* _pReactor;
};

Echo-Service は一般的な ServiceHandler です

class EchoServiceHandler
{
public:
    EchoServiceHandler(StreamSocket& socket, SocketReactor& reactor):
        _socket(socket),
        _reactor(reactor)
    {
        _reactor.addEventHandler(_socket, Observer<EchoServiceHandler, ReadableNotification>(*this, &EchoServiceHandler::onReadable));
        _reactor.addEventHandler(_socket, Observer<EchoServiceHandler, ErrorNotification>(*this, &EchoServiceHandler::onError));
    }

    ~EchoServiceHandler()
    {
        _reactor.removeEventHandler(_socket, Observer<EchoServiceHandler, ErrorNotification>(*this, &EchoServiceHandler::onError));
        _reactor.removeEventHandler(_socket, Observer<EchoServiceHandler, ReadableNotification>(*this, &EchoServiceHandler::onReadable));
    }

    void onReadable(ReadableNotification* pNf)
    {
        pNf->release();
        char buffer[4096];
        try {
            int n = _socket.receiveBytes(buffer, sizeof(buffer));
            if (n > 0)
            {
                _socket.sendBytes(buffer, n);
            } else
                onError();
        } catch( ... ) {
            onError();
        }
    }

    void onError(ErrorNotification* pNf)
    {
        pNf->release();
        onError();
    }

    void onError()
    {
        _socket.shutdown();
        _socket.close();
        _reactor.stop();
        delete this;
    }

private:
    StreamSocket   _socket;
    SocketReactor& _reactor;
};

EchoReactorConnection はクラス TcpServer と連携して、reactor をスレッドとして実行します

class EchoReactorConnection: public TCPServerConnection
{
public:
    EchoReactorConnection(const StreamSocket& s): TCPServerConnection(s)
    {
    }

    void run()
    {
        StreamSocket& ss = socket();
        SocketReactor reactor;

        ServerConnector<EchoServiceHandler> sc(ss, reactor);
        reactor.run();
        std::cout << "exit EchoReactorConnection thread" << std::endl;
    }
};

cppunit テスト ケースは TCPServerTest::testMultiConnections と同じですが、マルチスレッドに EchoReactorConnection を使用します。

void TCPServerTest::testMultithreadReactor()
{
    ServerSocket svs(0);
    TCPServerParams* pParams = new TCPServerParams;
    pParams->setMaxThreads(4);
    pParams->setMaxQueued(4);
    pParams->setThreadIdleTime(100);

    TCPServer srv(new TCPServerConnectionFactoryImpl<EchoReactorConnection>(), svs, pParams);
    srv.start();

    assert (srv.currentConnections() == 0);
    assert (srv.currentThreads() == 0);
    assert (srv.queuedConnections() == 0);
    assert (srv.totalConnections() == 0);

    //
    // same with TCPServerTest::testMultiConnections()
    //
    // ....
    ///
}
于 2012-10-24T15:51:33.323 に答える