2

多くの並列 Web ソケット接続を開くことをシミュレートするためにモジュールを実行していmassconnect.pyますが、しばらくして 15000 接続 (massconnect json で指定) を開こうとすると、次のエラー メッセージが表示されます。

Unhandled Error
Traceback (most recent call last):
  File "C:\Python27\lib\site-packages\twisted\python\log.py", line 88, in callWithLogger
    return callWithContext({"system": lp}, func, *args, **kw)
  File "C:\Python27\lib\site-packages\twisted\python\log.py", line 73, in callWithContext
    return context.call({ILogContext: newCtx}, func, *args, **kw)
  File "C:\Python27\lib\site-packages\twisted\python\context.py", line 118, in callWithContext
    return self.currentContext().callWithContext(ctx, func, *args, **kw)
  File "C:\Python27\lib\site-packages\twisted\python\context.py", line 81, in callWithContext
    return func(*args,**kw)
--- <exception caught here> ---
  File "C:\Python27\lib\site-packages\twisted\internet\iocpreactor\reactor.py", line 120, in _callEventCallback
    evt.callback(rc, bytes, evt)
  File "C:\Python27\lib\site-packages\twisted\internet\iocpreactor\tcp.py", line 285, in cbConnect
    self.socket.setsockopt(
exceptions.AttributeError: 'Client' object has no attribute 'socket'

Web ソケット接続の開始が中断され、この例外が発生します。

massconnect.pyアプリの Web ソケット認証を確認するようにモジュールを調整しました。クライアントが割り当てられるたびに一意の認証メッセージを送信するようにテストしています。

すでにMaxUserPortレジストリを 65534 に設定しています。この問題によりメモリが不足するのではないかと考えていましたが、別のマシンで調べたところ、これが原因ではないことがわかりました。

これは既知の python/twisted の問題でしょうか?

@グリフ:

massconnect.py のソース コードは次のとおりです。

'''
Created on Jan 29, 2013

@author: boro.petrovic
'''
###############################################################################
##
##  Copyright 2011,2012 Tavendo GmbH
##
##  Licensed under the Apache License, Version 2.0 (the "License");
##  you may not use this file except in compliance with the License.
##  You may obtain a copy of the License at
##
##      http://www.apache.org/licenses/LICENSE-2.0
##
##  Unless required by applicable law or agreed to in writing, software
##  distributed under the License is distributed on an "AS IS" BASIS,
##  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
##  See the License for the specific language governing permissions and
##  limitations under the License.
##
###############################################################################

import sys
import time
from twisted.internet import reactor
from twisted.internet import error
from twisted.internet.defer import Deferred, returnValue, inlineCallbacks
from twisted.internet.interfaces import IReactorTime
from autobahn.websocket import connectWS
from autobahn.websocket import WebSocketClientFactory, WebSocketClientProtocol
import time

f='start'

class MassConnectProtocol(WebSocketClientProtocol):   

    def sendHello(self, message):
        #self.sendMessage("Hello from Python!")
        self.sendMessage(payload=message)
        #reactor.callLater(2, self.sendHello(message='test'))

    def onOpen(self):
        global f
        wstoken = f.readline().rstrip()
        message = 'messagecontent'+wstoken
        self.factory.test.onConnected()
        self.sendHello(message)
        self.wstoken = wstoken

    def onMessage(self, msg, binary):
        print "Got message from wstoken " + self.wstoken + " :" + msg

class MassConnectFactory(WebSocketClientFactory):

    def clientConnectionFailed(self, connector, reason):
        if self.test.onFailed():
            reactor.callLater(float(self.retrydelay)/1000., connector.connect)

    def clientConnectionLost(self, connector, reason):
        if self.test.onLost():
            reactor.callLater(float(self.retrydelay)/1000., connector.connect)

class MassConnect:

    def __init__(self, name, uri, connections, batchsize, batchdelay, retrydelay):
        self.name = name
        self.uri = uri
        self.batchsize = batchsize
        self.batchdelay = batchdelay
        self.retrydelay = retrydelay
        self.failed = 0
        self.lost = 0
        self.targetCnt = connections
        self.currentCnt = 0
        self.actual = 0

    def run(self):
        self.d = Deferred()
        self.started = time.clock()
        self.connectBunch()
        return self.d

    def onFailed(self):
        self.failed += 1
        return True

    def onLost(self):
        self.lost += 1
        return True

    def onConnected(self):
        #sprint "connect"
        self.actual += 1
        if self.actual % self.batchsize == 0:
            sys.stdout.write(".")
        if self.actual == self.targetCnt:
            self.ended = time.clock()
            duration = self.ended - self.started
            print " connected %d clients to %s at %s in %s seconds (retries %d = failed %d + lost %d)" % (self.currentCnt, self.name, self.uri, duration, self.failed + self.lost, self.failed, self.lost)
            try:
                reactor.run()
            except twisted.internet.error.ReactorAlreadyRunning:
                pass

            result = {'name': self.name,
                  'uri': self.uri,
                  'connections': self.targetCnt,
                  'retries': self.failed + self.lost,
                  'lost': self.lost,
                  'failed': self.failed,
                  'duration': duration}
            self.d.callback(result)

    def connectBunch(self):

        if self.currentCnt + self.batchsize < self.targetCnt:
            c = self.batchsize
            redo = True
        else:
            c = self.targetCnt - self.currentCnt
            redo = False
        for i in xrange(0, c):
            factory = MassConnectFactory(self.uri, origin=None, protocols=[("myprotocol")])
            factory.test = self
            factory.retrydelay = self.retrydelay
            factory.protocol = MassConnectProtocol
            factory.setProtocolOptions(version=13)
            factory.setSessionParameters(url="myurl", origin=None, protocols=["myprotocol"])
            connectWS(factory)
            self.currentCnt += 1
        if redo:
            reactor.callLater(float(self.batchdelay)/1000., self.connectBunch)

class MassConnectTest:

    def __init__(self, spec):
        global f
        f = open("C:/tokens.txt", "r")
        self.spec = spec

    @inlineCallbacks
    def run(self):
        global f
        res = []
        for s in self.spec['servers']:
            t = MassConnect(s['name'],
                            s['uri'],
                            self.spec['options']['connections'],
                            self.spec['options']['batchsize'],
                            self.spec['options']['batchdelay'],
                            self.spec['options']['retrydelay'])
            r = yield t.run()
            res.append(r)
        returnValue(res)
        f.close()

スクリプトでは、機密データをダミー文字列に置き換えました。

token.txt ファイルから、Web ソケット経由でテスト対象のアプリケーションに接続している 15000 人の各ユーザーの認証トークンを読み取っています。このようにして、多くの異なるユーザーの接続をシミュレートし、それぞれに対してアクティブな独立した Web ソケット接続を持っています。

残念ながら、スクリプトの実行中 (接続の作成中) に Unhandled Error メッセージが表示されたため、15000 の接続がすべて確立されるまでには至りませんでした。

4

1 に答える 1

0

SSCCEに縮小massconnect.pyしていただけると助かります。ただし、問題が数千の同時接続のレベルでのみ発生する場合は、ファイル記述子が不足している可能性があります (これに対する Twisted のエラー メッセージは適切ではありません)。

ご使用のオペレーティング システムでこのパラメーターを変更する方法については、この質問を参照してください。

于 2015-10-26T22:10:52.393 に答える