2

私は Python と Twisted の両方にかなり慣れていないので、物事を正しく理解していないだけかもしれませんが、助けが必要な点で立ち往生しているようです。

私がやりたいことは、SSL 接続でReconnectingClientFactoryを使用することです。私はそれをすべて実行していますが、接続が切断された場合、トランスポートの write() メソッドに送信されたすべてのデータはエラーなしで単純に削除されます。呼び出される実際のメソッドはtwisted.protocols.tls.TLSMemoryBIOProtocol.write()です。

これが私が起こっていると思うことです(動作中の接続から始めて):

クライアントの縮小版は次のとおりです。

from OpenSSL import SSL
from twisted.internet.protocol import (Protocol, ReconnectingClientFactory)
from twisted.internet import (reactor, ssl)
import struct

class MetricsServer(Protocol):
    streambuffer = bytearray()

    def connectionMade(self):
        self.transport.setTcpKeepAlive(True) # maintain the TCP connection
        self.transport.setTcpNoDelay(False) # allow Nagle algorithm
        print("connected to server")            

    def dataReceived(self, data):
        print("from server:", data)

    def connectionLost(self, reason):
        self.connected = 0
        print("server connection lost:", reason)

class MetricsServerFactory(ReconnectingClientFactory):
    protocol = MetricsServer
    maxDelay = 300 # maximum seconds between retries
    factor = 1.6180339887498948
    packet_sequence_number = 0
    active_connection = None

    def buildProtocol(self, addr):
        self.resetDelay()
        if self.active_connection == None:
            self.active_connection = self.protocol()
        return self.active_connection

    def get_packet_sequence_number(self):
        self.packet_sequence_number += 1
        return self.packet_sequence_number

    def send_data(self):
        print ("sending ssl packet")
        packet = struct.pack("!I", self.get_packet_sequence_number())
        self.active_connection.transport.write(packet)
        reactor.callLater(1.0, metrics_server.send_data)

class CtxFactory(ssl.ClientContextFactory):
    def getContext(self):
        self.method = SSL.TLSv1_METHOD
        ctx = ssl.ClientContextFactory.getContext(self)
        ctx.use_certificate_file('keys/client.crt')
        ctx.use_privatekey_file('keys/client.key')

        def verifyCallback(connection, x509, errnum, errdepth, ok):
            return bool(ok)
        ctx.set_verify(SSL.VERIFY_PEER, verifyCallback)
        ctx.load_verify_locations("keys/ca.pem")
        return ctx

if __name__ == "__main__":
    metrics_server = MetricsServerFactory()
    reactor.connectSSL('localhost', 8000, metrics_server, CtxFactory())
    reactor.callLater(3.0, metrics_server.send_data)
    reactor.run()

そして、受信したデータを出力する単純なサーバーを次に示します。

from OpenSSL import SSL
from twisted.internet import ssl, reactor
from twisted.internet.protocol import Factory, Protocol

class Echo(Protocol):
    sent_back_data = False

    def dataReceived(self, data):
        print(' '.join("{0:02x}".format(x) for x in data))

def verifyCallback(connection, x509, errnum, errdepth, ok):
    return bool(ok)

if __name__ == '__main__':
    factory = Factory()
    factory.protocol = Echo

    myContextFactory = ssl.DefaultOpenSSLContextFactory(
        'keys/server.key', 'keys/server.crt'
        )
    ctx = myContextFactory.getContext()
    ctx.set_verify(
        SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
        verifyCallback
        )

    ctx.load_verify_locations("keys/ca.pem")
    reactor.listenSSL(8000, factory, myContextFactory)
    reactor.run()

問題を再現するプロセス:

  • これを機能させるには、まず独自の証明書と CA を生成する必要があります。
  • 最初にサーバーを実行します
  • クライアントコードを実行する
  • サーバー側で何らかの出力を待ってから、プログラムを終了します
  • クライアントがデータの送信を試み続けていることに注意してください
  • サーバー側を再起動します
  • サーバー側はパケットを受信し続けますが、接続が失われたときに送信されたパケットは単にドロップされることに注意してください

回避策として、再接続時にデータを送信する独自​​のバッファーを実装しようとしましたが、別の問題に遭遇しました。接続が再確立されたときにデータを送信したいのですが、表示できる唯一のフックは Protocol.connectionMade() です。ただし、そのメソッドはTLS ハンドシェークが実際に行われる前に呼び出されるため、_write() の例外ハンドラーによってキャッチされ、後で送信される別のバッファーに配置されます。 しかし、そのバッファは、データが相手から受信された場合にのみ送信されるようです(私の場合、これはあまり頻繁には発生しません。また、データが受信される前に write() が呼び出される可能性があるため、データが間違った順序で反対側に到着する可能性があることも意味します)。また、データを受信する前に別の切断を行うと、データのバッファが消去されると思います。

編集: 最初の問題のサンプル コードを追加しました。私が Factory にそれを持っているのはおそらく奇妙ですが、私はactive_connectionそれをシングルトンとして機能させようとしています。

4

1 に答える 1

0

さて、私は回避策の問題を理解しました...bytearrayトランスポートに書き込むためにaを渡し、その後すぐにクリアしましたが、バッファをクリアするまで書き込みが延期されていることに気づきませんでした。そのため、のコピーを渡すと、bytearray現在は機能しているようです。

write へのすべての呼び出しが接続されているかどうかを確認するためにチェックを続行する必要があるというのは、まだ完全には正しくないようReconnectingClientFactoryです。ifまた、そのステートメントとwrite()実際に実行されたときに接続が失われる可能性があるため、データが失われる可能性があると思います。

from OpenSSL import SSL
from twisted.internet.protocol import (Protocol, ReconnectingClientFactory)
from twisted.internet import (reactor, ssl)
import struct

class MetricsServer(Protocol):
    streambuffer = bytearray()

    def connectionMade(self):
        self.transport.setTcpKeepAlive(True) # maintain the TCP connection
        self.transport.setTcpNoDelay(False) # allow Nagle algorithm
        print("connected to server")
        if len(self.transport.factory.wrappedFactory.send_buffer) > 0:
            self.transport.write(bytes(self.transport.factory.wrappedFactory.send_buffer))
            self.transport.factory.wrappedFactory.send_buffer.clear()

    def dataReceived(self, data):
        print("from server:", data)

    def connectionLost(self, reason):
        self.connected = 0
        print("server connection lost:", reason)


class MetricsServerFactory(ReconnectingClientFactory):
    protocol = MetricsServer
    maxDelay = 300 # maximum seconds between retries
    factor = 1.6180339887498948
    packet_sequence_number = 0
    active_connection = None

    send_buffer = bytearray()

    def buildProtocol(self, addr):
        self.resetDelay()
        if self.active_connection == None:
            self.active_connection = self.protocol()
        return self.active_connection

    def get_packet_sequence_number(self):
        self.packet_sequence_number += 1
        return self.packet_sequence_number

    def send_data(self):
        print ("sending ssl packet")
        packet = struct.pack("!I", self.get_packet_sequence_number())
        if self.active_connection and self.active_connection.connected:
            self.active_connection.transport.write(packet)
        else:
            self.send_buffer.extend(packet)
        reactor.callLater(1.0, metrics_server.send_data)


class CtxFactory(ssl.ClientContextFactory):
    def getContext(self):
        self.method = SSL.TLSv1_METHOD
        ctx = ssl.ClientContextFactory.getContext(self)
        ctx.use_certificate_file('keys/client.crt')
        ctx.use_privatekey_file('keys/client.key')

        def verifyCallback(connection, x509, errnum, errdepth, ok):
            return bool(ok)
        ctx.set_verify(SSL.VERIFY_PEER, verifyCallback)
        ctx.load_verify_locations("keys/ca.pem")
        return ctx

if __name__ == "__main__":
    metrics_server = MetricsServerFactory()
    reactor.connectSSL('localhost', 8000, metrics_server, CtxFactory())
    reactor.callLater(3.0, metrics_server.send_data)
    reactor.run()
于 2013-01-03T16:02:19.620 に答える