0

私は現在、同じリッスン ソケットを介したマルチプロセッシングを可能にする websocket の実装に取り​​組んでいます。クアッド コア マシンで 4 つのプロセスを使用して驚異的なパフォーマンスを実現できます。

4 つのリクエストの後、8 つのプロセスのように上に移動すると、epoll.poll はイベントを発生しなくなります。興味深いことに、2 つの異なるポートに 2 つのリスナーを使用して、同じプログラムを実行してみました。リスナーごとに 4 つのプロセスがあるため、ソケットごとに 2 つのリクエストの後にブロックされます。リスナーごとに 2 つのプロセスがあるため、すべてうまくいきます。

何か考えはありますか?

メイン.py (抜粋)

#create the WSServer
wsserver = WSServer(s.bind_ip, s.bind_port, s.max_connections)
# specify on how many process we'll run
wsserver.num_process = s.num_process
Process(target=wsserver.run,args=()).start()

wsserver.py (抜粋)

def serve_forever_epoll(wsserver):
    log(current_process())
    epoll = select.epoll()
    epoll.register(wsserver.socket.fileno(), select.EPOLLIN)
    try:
        client_map = {}
        while wsserver.run:
            events = epoll.poll(1)
            for fileno, event in events:
                if fileno == wsserver.socket.fileno():
                    channel, details = wsserver.socket.accept()
                    channel.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
                    aclient = wsclient.WSClient(channel, wsserver, process_server.client_manager)
                    client_map[channel.fileno()] = aclient
                    epoll.register(channel.fileno(), select.EPOLLIN )
                    log('Accepting client on %s' % current_process())
                    aclient.do_handshake()

                elif event & select.EPOLLIN:
                        aclient = client_map[fileno]
                        threading.Thread(target=aclient.interact).start()

    except Exception, e:
        log(e)
    finally:
        epoll.unregister(wsserver.socket.fileno())
        epoll.close()
        wsserver.socket.close()

class WSServer():

    def __init__(self, address, port, connections):
        self.address = address
        self.port = port
        self.connections = connections
        self.onopen = onopen
        self.onclose = onclose
        log('server init')
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        #self.socket.setblocking(0)
        self.socket.bind((self.address, int(self.port)))
        self.socket.listen(self.connections)

    def run(self, *args):
        multiprocessing.log_to_stderr(logging.DEBUG)
        log("Run server")
        try:

            log("Starting Server")
            self.run = True
            serve_forever = serve_forever_epoll
            for i in range(self.num_process-1):
                log('Starting Process')
                Process(target=serve_forever,args=(self,)).start()
            serve_forever(self)

        except Exception as e:
            log("Exception-- %s " % e)
            pass
4

1 に答える 1

0

最後に、この奇妙なケースは、私が使用していた別のモジュールが原因でした。どのプロセスがどのクライアントを保持しているかを追跡するためのマネージャーとして Pyro4 を使用しています。これにより、IPC が大幅に簡素化され、いくつかの user_data に基づくクライアント フィルタリングも可能になります。

問題は、Pyro4 デーモンが MainProcess で実行されていたが、メイン スレッドでは実行されていなかったことです!... プロセスが 4 つ未満である限り、すべて問題ありませんでした (理由は聞かないでください)。Pyro をメインプロセス + スレッド イベント ループ内に移動すると、完全に動作しました! これで、同じリッスン ポートに対して 8、16、または 32 のプロセスを達成できるだけでなく、新しい構成を生成して複製したり、websocket サーバーの新しいエンドポイントを公開したりできます。

ご協力いただきありがとうございます。お時間をいただき申し訳ありません...

于 2012-08-21T20:19:19.587 に答える