MyProtocol
クラスをのぞき見ることなく、何が起こっているのかを正確に言うのは難しい. この問題は、低レベルの関数を直接いじっているため、WebSocket 接続の内部状態の表現であるクラスのstate
属性を直接いじっているという事実によって引き起こされているように思えます。WebSocket
アウトバーンのリファレンス docによると、WebSicketProtocol
直接使用してオーバーライドできるの APIは次のとおりです。
- onOpen
- onMessage
- onClose
- メッセージを送る
- 送信閉じる
StringTransport
を使用してプロトコルをテストするというアプローチは理想的ではありません。問題は、autobahn が提供するフレームワークMyProtocol
の上にある小さなレイヤーでWebSocketProtocol
あり、良くも悪くも、接続、トランスポート、および内部プロトコル状態の管理に関する詳細を隠しているという事実にあります。
考えてみると、自分のものをテストしWebSocketProtocol
たいのであって、ダミーのサーバーやクライアントを埋め込みたくない場合は、MyProtocol
オーバーライドするメソッドを直接テストするのが最善の策です。
私が言っていることの例は次のとおりです
class MyPublisher(object):
cbk=None
def publish(self, msg):
if self.cbk:
self.cbk(msg)
class MyProtocol(WebSocketServerProtocol):
def __init__(self, publisher):
WebSocketServerProtocol.__init__(self)
#Defining callback for publisher
publisher.cbk = self.sendMessage
def onMessage(self, msg, binary)
#Stupid echo
self.sendMessage(msg)
class NotificationTest(unittest.TestCase):
class MyProtocolFactory(WebSocketServerFactory):
def __init__(self, publisher):
WebSocketServerFactory.__init__(self, "ws://127.0.0.1:8081")
self.publisher = publisher
self.openHandshakeTimeout = None
def buildProtocol(self, addr):
protocol = MyProtocol(self.listener)
protocol.factory = self
protocol.websocket_version = 13 #Hybi version 13 is supported by pretty much everyone (apart from IE <8 and android browsers)
return protocol
def setUp(self):
publisher = task.LoopingCall(self.send_stuff, "Hi there")
factory = NotificationTest.MyProtocolFactory(listener)
protocol = factory.buildProtocol(None)
transport = proto_helpers.StringTransport()
def play_dumb(*args): pass
setattr(transport, "setTcpNoDelay", play_dumb)
protocol.makeConnection(transport)
self.protocol, self.transport, self.publisher, self.fingerprint_handler = protocol, transport, publisher, fingerprint_handler
def test_onMessage(self):
#Following 2 lines are the problematic part. Here you are manipulating explicitly a hidden state which your implementation should not be concerned with!
self.protocol.state = WebSocketProtocol.STATE_OPEN
self.protocol.websocket_version = 13
self.protocol.onMessage("Whatever")
self.assertEqual(self.transport.value()[2:], 'Whatever')
def test_push(self):
#Following 2 lines are the problematic part. Here you are manipulating explicitly a hidden state which your implementation should not be concerned with!
self.protocol.state = WebSocketProtocol.STATE_OPEN
self.protocol.websocket_version = 13
self.publisher.publish("Hi there")
self.assertEqual(self.transport.value()[2:], 'Hi There')
お気づきかもしれませんが、StringTransport
here の使用は非常に面倒です。下線フレームワークの知識が必要であり、その状態管理をバイパスする必要があります。これは、実際にはやりたくないことです。残念ながらアウトバーンは、状態を簡単に操作できるすぐに使えるテスト オブジェクトを提供していないため、ダミーのサーバーとクライアントを使用するという私の提案は依然として有効です。
サーバーをネットワークでテストする
提供されているテストは、サーバー プッシュをテストする方法を示しています。取得しているものは期待どおりであることをアサートし、いつ終了するかを決定する方法についてのフックも使用します。
サーバープロトコル
from twisted.trial.unittest import TestCase as TrialTest
from autobahn.websocket import WebSocketServerProtocol, WebSocketServerFactory, WebSocketClientProtocol, WebSocketClientFactory, connectWS, listenWS
from twisted.internet.defer import Deferred
from twisted.internet import task
START="START"
class TestServerProtocol(WebSocketServerProtocol):
def __init__(self):
#The publisher task simulates an event that triggers a message push
self.publisher = task.LoopingCall(self.send_stuff, "Hi there")
def send_stuff(self, msg):
#this method sends a message to the client
self.sendMessage(msg)
def _on_start(self):
#here we trigger the task to execute every second
self.publisher.start(1.0)
def onMessage(self, message, binary):
#According to this stupid protocol, the server starts sending stuff when the client sends a "START" message
#You can plug other commands in here
{
START : self._on_start
#Put other keys here
}[message]()
def onClose(self, wasClean, code, reason):
#After closing the connection, we tell the task to stop sending messages
self.publisher.stop()
クライアントのプロトコルとファクトリ
次のクラスはクライアント プロトコルです。基本的に、サーバーにメッセージのプッシュを開始するように指示します。それらの を呼び出してclose_condition
、接続を閉じる時が来たかどうかを確認し、最後に、受信したメッセージの関数を呼び出してassertion
、テストが成功したかどうかを確認します。
class TestClientProtocol(WebSocketClientProtocol):
def __init__(self, assertion, close_condition, timeout, *args, **kwargs):
self.assertion = assertion
self.close_condition = close_condition
self._received_msgs = []
from twisted.internet import reactor
#This is a way to set a timeout for your test
#in case you never meet the conditions dictated by close_condition
self.damocle_sword = reactor.callLater(timeout, self.sendClose)
def onOpen(self):
#After the connection has been established,
#you can tell the server to send its stuff
self.sendMessage(START)
def onMessage(self, msg, binary):
#Here you get the messages pushed from the server
self._received_msgs.append(msg)
#If it is time to close the connection
if self.close_condition(msg):
self.damocle_sword.cancel()
self.sendClose()
def onClose(self, wasClean, code, reason):
#Now it is the right time to check our test assertions
self.assertion.callback(self._received_msgs)
class TestClientProtocolFactory(WebSocketClientFactory):
def __init__(self, assertion, close_condition, timeout, **kwargs):
WebSocketClientFactory.__init__(self, **kwargs)
self.assertion = assertion
self.close_condition = close_condition
self.timeout = timeout
#This parameter needs to be forced to None to not leave the reactor dirty
self.openHandshakeTimeout = None
def buildProtocol(self, addr):
protocol = TestClientProtocol(self.assertion, self.close_condition, self.timeout)
protocol.factory = self
return protocol
トライアルベースのテスト
class WebSocketTest(TrialTest):
def setUp(self):
port = 8088
factory = WebSocketServerFactory("ws://localhost:{}".format(port))
factory.protocol = TestServerProtocol
self.listening_port = listenWS(factory)
self.factory, self.port = factory, port
def tearDown(self):
#cleaning up stuff otherwise the reactor complains
self.listening_port.stopListening()
def test_message_reception(self):
#This is the test assertion, we are testing that the messages received were 3
def assertion(msgs):
self.assertEquals(len(msgs), 3)
#This class says when the connection with the server should be finalized.
#In this case the condition to close the connectionis for the client to get 3 messages
class CommunicationHandler(object):
msg_count = 0
def close_condition(self, msg):
self.msg_count += 1
return self.msg_count == 3
d = Deferred()
d.addCallback(assertion)
#Here we create the client...
client_factory = TestClientProtocolFactory(d, CommunicationHandler().close_condition, 5, url="ws://localhost:{}".format(self.port))
#...and we connect it to the server
connectWS(client_factory)
#returning the assertion as a deferred purely for demonstration
return d
これは明らかに単なる例ですが、ご覧のとおり、明示的にいじる必要はありませんmakeConnection
でしtransport
た