3

サーバーに接続し、サーバーにさまざまなコマンドを非同期的に送信し、返されたデータをクライアントに提供するクラスを作成するように依頼されました。私にとって新しい言語であるPythonでこれを行うように依頼されました。私は掘り下げ始めて、自分のソケットベースのアプリをロールする場合にやらなければならないことの多くを実行するいくつかの非常に優れた抽象化(Protocol、ProtocolFactory、Reactor)を提供するTwistedフレームワークを見つけました。私が解決しなければならない問題を考えると、それは正しい選択のようです。

Web(主にKrondo)で多数の例を調べましたが、複数のコマンドをネットワーク経由で送信するクライアントを作成する良い例はまだ見ていません。作成した接続を維持しています。この場合、サーバー(私は制御できません)は、応答を送信した後も切断しません。では、さまざまな方法でサーバーをくすぐることができるようにクライアントを設計する適切な方法は何ですか?

今私はこれをします:

class TestProtocol(Protocol)
    def connectionMade(self):
         self.transport.write(self.factory.message)

class TestProtocolFactory(Factory):
    message = ''
    def setMessage(self, msg):
        self.message = msg

def main():
    f = TestProtocolFactory()
    f.setMessage("my message")
    reactor.connectTCP(...)
    reactor.run()

私が本当にやりたいのはself.transport.write(...)、接続が確立されたときだけでなく、reactorを介して呼び出すことです(実際には、別の実行スレッドからオンデマンドでTestProtocolFactory :: setMessage()を呼び出します)。

4

3 に答える 3

4

依存します。ここにいくつかの可能性があります:

私は仮定しています

アプローチ1.サーバーに送信するコマンドのリストがあり、何らかの理由で一度にすべてを実行することはできません。その場合、前の回答が返されるので、新しいものを送信します。

class proto(parentProtocol):
    def stringReceived(self, data):
        self.handle_server_response(data)
        next_command = self.command_queue.pop()
        # do stuff

アプローチ2.サーバーに送信する内容は、サーバーが送信する内容に基づいています。

class proto(parentProtocol):
    def stringReceived(self, data):
        if data == "this":
            self.sendString("that")
        elif data == "foo":
            self.sendString("bar")
        # and so on

アプローチ3.サーバーが何に送信するかは気にせず、定期的にいくつかのコマンドを送信したいだけです。

class proto(parentProtocol):
    def callback(self):
        next_command = self.command_queue.pop()
        # do stuff
    def connectionMade(self):
        from twisted.internet import task
        self.task_id = task.LoopingCall(self.callback)
        self.task_id.start(1.0)

アプローチ4:編集では、別のスレッドからのトリガーについて言及しています。ねじれたドキュメントをチェックして、proto.sendStringスレッドセーフかどうかを確認してください。直接電話できるかもしれませんが、わかりません。ただし、アプローチ3 スレッドセーフです。別のスレッドからキュー(スレッドセーフ)を埋めるだけです。

基本的に、プロトコルには任意の量の状態を保存できます。完了するまでそれは残ります。あなたはあなたへのメッセージへの応答としてサーバーにコマンドを送るか、あなたはあなたの仕事をするためにいくつかのスケジュールを設定します。または両方。

于 2012-05-09T19:34:22.007 に答える
4

サービスを使用することをお勧めします。

サービスは、開始および停止されるTwistedアプリ内の機能の一部であり、コードの他の部分が対話するための優れた抽象化です。たとえば、この場合、SayStuffToServerService(ひどい名前ですが、その仕事について詳しく知らなくても、ここでできる最善の方法でした:))があり、次のようなものが公開されている可能性があります。

class SayStuffToServerService:
    def __init__(self, host, port):
        # this is the host and port to connect to

    def sendToServer(self, whatToSend):
        # send some line to the remote server

    def startService(self):
        # call me before using the service. starts outgoing connection efforts.

    def stopService(self):
        # clean reactor shutdowns should call this method. stops outgoing
        # connection efforts.

(これで必要なインターフェイスはすべてこれで十分かもしれませんが、どこに追加できるかはかなり明確になっているはずです。)

ここでのメソッドstartService()stopService()メソッドは、Twistedのサービスが公開しているものです。そして便利なことに、TCPクライアントのように機能し、すべてのリアクターの処理を行う、事前に作成されたツイストサービスがあります。これtwisted.application.internet.TCPClientは、実際の接続試行の処理を処理するProtocolFactoryとともに、リモートホストとポートの引数を取ります。

これがSayStuffToServerServiceで、次のサブクラスとして実装されていTCPClientます。

from twisted.application import internet

class SayStuffToServerService(internet.TCPClient):
    factoryclass = SayStuffToServerProtocolFactory

    def __init__(self, host, port):
        self.factory = self.factoryclass()
        internet.TCPClient.__init__(self, host, port, self.factory)

    def sendToServer(self, whatToSend):
        # we'll do stuff here

(SayStuffToServerProtocolFactoryについては、以下を参照してください。)

このサービスアーキテクチャの使用は、多くの点で便利です。サービスを1つのコンテナーにグループ化すると、アプリのさまざまな部分をアクティブにしたいときに、サービスがすべて停止して1つとして開始されます。アプリの他の部分を個別のサービスとして実装することは理にかなっている場合があります。サービスを子サービスとして設定できます-アプリを初期化、デーモン化、およびシャットダウンする方法を知るために検索applicationする魔法の名前。twistd実際にはそうです、今それを行うためにいくつかのコードを追加しましょう。

from twisted.application import service

...

application = service.Application('say-stuff')

sttss = SayStuffToServerService('localhost', 65432)
sttss.setServiceParent(service.IServiceCollection(application))

それで全部です。このモジュールをtwistd(つまり、デバッグのためにtwistd -noy saystuff.py)で実行すると、適切なリアクターの下で開始され、SayStuffToServerServiceが開始されます。これにより、サービスの属性applicationを使用するlocalhost:65432への接続が開始されます。factory接続とプロトコルを設定します。reactor.run()もう自分で原子炉に電話したり、物を取り付けたりする必要はありません。

したがって、SayStuffToServerProtocolFactoryはまだ実装していません。クライアントが接続を失った場合は再接続することをお勧めします(sendToServer通常、の呼び出し元は接続が機能していると見なすことができます)ので、このプロトコルファクトリをに追加しReconnectingClientFactoryます。

from twisted.internet import protocol

class SayStuffToServerProtocolFactory(protocol.ReconnectingClientFactory):
    _my_live_proto = None
    protocol = SayStuffToServerProtocol

これは非常に優れた最小限の定義であり、指定したホストとポートへの発信TCP接続を試行し続け、毎回SayStuffToServerProtocolをインスタンス化します。接続に失敗した場合、このクラスは、ネットワークが破壊されないように、適切に動作する指数バックオフを実行します(最大待機時間を設定できます)。指数バックオフが期待どおりに機能し続けるように、_my_live_protoこのファクトリのメソッドを割り当てて呼び出すのはプロトコルの責任です。resetDelay()そして、これがそのプロトコルです。

class SayStuffToServerProtocol(basic.LineReceiver):
    def connectionMade(self):
        # if there are things you need to do on connecting to ensure the
        # connection is "all right" (maybe authenticate?) then do that
        # before calling:
        self.factory.resetDelay()
        self.factory._my_live_proto = self

    def connectionLost(self, reason):
        self.factory._my_live_proto = None
        del self.factory

    def sayStuff(self, stuff):
        self.sendLine(stuff)

    def lineReceived(self, line):
        # do whatever you want to do with incoming lines. often it makes sense
        # to have a queue of Deferreds on a protocol instance like this, and
        # each incoming response gets sent to the next queued Deferred (which
        # may have been pushed on the queue after sending some outgoing
        # message in sayStuff(), or whatever).
        pass

これは上に実装されtwisted.protocols.basic.LineReceiverますが、プロトコルが行指向でない場合は、他の種類のプロトコルでも同様に機能します。

残っているのは、サービスを適切なプロトコルインスタンスに接続することだけです。これが、ファクトリが_my_live_proto属性を保持する理由です。この属性は、接続が正常に確立されたときに設定され、接続が失われたときにクリア(Noneに設定)される必要があります。これがの新しい実装ですSayStuffToServerService.sendToServer

class NotConnectedError(Exception):
    pass

class SayStuffToServerService(internet.TCPClient):

    ...

    def sendToServer(self, whatToSend):
        if self.factory._my_live_proto is None:
            # define here whatever behavior is appropriate when there is no
            # current connection (in case the client can't connect or
            # reconnect)
            raise NotConnectedError
        self.factory._my_live_proto.sayStuff(whatToSend)

そして今、それをすべて1つの場所にまとめます。

from twisted.application import internet, service
from twisted.internet import protocol
from twisted.protocols import basic

class SayStuffToServerProtocol(basic.LineReceiver):
    def connectionMade(self):
        # if there are things you need to do on connecting to ensure the
        # connection is "all right" (maybe authenticate?) then do that
        # before calling:
        self.factory.resetDelay()
        self.factory._my_live_proto = self

    def connectionLost(self, reason):
        self.factory._my_live_proto = None
        del self.factory

    def sayStuff(self, stuff):
        self.sendLine(stuff)

    def lineReceived(self, line):
        # do whatever you want to do with incoming lines. often it makes sense
        # to have a queue of Deferreds on a protocol instance like this, and
        # each incoming response gets sent to the next queued Deferred (which
        # may have been pushed on the queue after sending some outgoing
        # message in sayStuff(), or whatever).
        pass

class SayStuffToServerProtocolFactory(protocol.ReconnectingClientFactory):
    _my_live_proto = None
    protocol = SayStuffToServerProtocol

class NotConnectedError(Exception):
    pass

class SayStuffToServerService(internet.TCPClient):
    factoryclass = SayStuffToServerProtocolFactory

    def __init__(self, host, port):
        self.factory = self.factoryclass()
        internet.TCPClient.__init__(self, host, port, self.factory)

    def sendToServer(self, whatToSend):
        if self.factory._my_live_proto is None:
            # define here whatever behavior is appropriate when there is no
            # current connection (in case the client can't connect or
            # reconnect)
            raise NotConnectedError
        self.factory._my_live_proto.sayStuff(whatToSend)

application = service.Application('say-stuff')

sttss = SayStuffToServerService('localhost', 65432)
sttss.setServiceParent(service.IServiceCollection(application))

うまくいけば、それは開始するための十分なフレームワークを提供します。クライアントの切断を希望どおりに処理したり、サーバーからの異常な応答を処理したり、さまざまな種類のタイムアウトを処理したり、保留中の要求をキャンセルしたり、複数のプールされた接続を許可したりするために、多くの配管が必要になる場合があります。などですが、これは役立つはずです。

于 2012-05-14T20:45:58.100 に答える
0

ツイストフレームワークはイベントベースのプログラミングです。そして本質的に、そのメソッドはすべて非同期で呼び出され、結果はdeferオブジェクトによって取得されます。

フレームワークの性質はプロトコル開発に適しています。従来のシーケンシャルプログラミングから考えを変える必要があります。Protocolクラスは、接続の確立、接続の喪失、データの受信などのイベントを伴う有限状態マシンのようなものです。クライアントコードをFSMに変換すると、Protocolクラスに簡単に適合できます。

以下は私が表現したいことの大まかな例です。少しルージュですが、これは私が今提供できるものです:

class SyncTransport(Protocol):
    # protocol
    def dataReceived(self, data):
        print 'receive data', data
    def connectionMade(self):
        print 'i made a sync connection, wow'
        self.transport.write('x')
        self.state = I_AM_LIVING
    def connectionLost(self):
        print 'i lost my sync connection, sight'
    def send(self, data):
        if self.state == I_AM_LIVING:
            if data == 'x':
              self.transport.write('y')
           if data == 'Y':
              self.transport.write('z')
              self.state = WAITING_DEAD
        if self.state == WAITING_DEAD:
              self.transport.close()
于 2013-04-03T06:09:35.027 に答える