チャットサーバーを作成していますが、単体テスト中に次の問題が発生しました。単体テストの1つで、多くのテストクライアントをサーバーに接続します。接続されているユーザーの数が511に達すると、サーバーはエラーメッセージなしで応答を停止します。この段階では、すべてがPC上でローカルに実行されます。
フォーラムに貼り付けるための簡単なサーバー、テストクライアント、および単体テストコードを用意しました。
サーバーがハングアップする理由はありますか?どんな助けでも大歓迎です
このコードは基本的に、ねじれた単純なチャットチュートリアルからのものです。シンプルなサーバー:
from twisted.internet.protocol import Factory
from twisted.protocols.basic import LineReceiver
from twisted.internet import reactor
class Chat(LineReceiver):
def __init__(self, users, userNum):
self.users = users
self.userNum = userNum
def connectionMade(self):
print "Connected to user %d" % (self.userNum)
self.users[self.userNum] = self
def connectionLost(self, reason):
print "Connection to user %d lost" % (self.userNum)
if self.users.has_key(self.userNum):
del self.users[self.userNum]
def lineReceived(self, line):
for user in self.users:
if user == self.userNum:
continue
self.users[user].sendLine("%d - %s" % (self.userNum, line))
class ChatFactory(Factory):
def __init__(self):
self.users = {} # maps user names to Chat instances
self.nUsers = 0
def buildProtocol(self, addr):
self.nUsers += 1
return Chat(self.users, self.nUsers)
def clientConnectionFailed(self, connector, reason):
print 'connection failed:', reason.getErrorMessage()
reactor.stop()
def clientConnectionLost(self, connector, reason):
print 'connection lost:', reason.getErrorMessage()
reactor.stop()
reactor.listenTCP(8123, ChatFactory())
reactor.run()
これは私のテストクライアントです。このクライアントは、単体テストによって数回インスタンス化されます。
import socket
HOST = "localhost"
PORT = 8123
class TestClient:
def __init__(self):
self.connected = False
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except socket.error, msg:
print("Socket error %s" % msg)
def connect(self):
try:
self.socket.settimeout(10)
self.socket.connect((HOST, PORT))
self.connected = True
except socket.error, msg:
print("Socket error %s" % msg)
self.connected = False
def disconnect(self):
self.socket.close()
def connected(self):
return self.connected
最後に、単体テストコードファイル:
import unittest
from TestClient import TestClient
class TestSequenceFunctions(unittest.TestCase):
def test_manyUsers(self):
users = []
number_of_users = 1000
for u in range(number_of_users):
# create client
users.append(TestClient())
# connect client to server
users[-1].connect()
# check connected state
self.assertTrue(users[-1].connected, "User %d is not connected" % (u))
# close connection of all users
for user in users:
user.disconnect()
if __name__ == '__main__':
unittest.main()