11

app.py で asyncio を使用して 2 つのプロトコル (TcpClient と UdpServer) を作成したいと思います。ここで、TcpClient は server.py と UdpServer との永続的な接続を UDP サーバーとして提供します。

私が必要とするもの:
a)両方のプロトコルが通信すること:メソッドを互いに呼び出すこと。これは最初の接続でのみ機能しています。TcpClient が再接続した場合、「send to tcp」という文字列を再度送信することはできません。UdpServer から来ています。私はチェックしprint(self)て、TcpClientが新しいインスタンスを作成し、古いものはまだ存在しますが、接続していませんが、それをリファクタリングする方法がわかりません。私は asyncio を間違った方法で使用していると思います。
b) TcpClient が server.py から切断されたら、5 秒待ってから再接続を試みます。私はasyncioを使用しようとしましたcall_later()が、それを行うためのネイティブな方法があり、巧妙ではないと思います。
c) app.py を起動し、TcpClient が接続できない場合は、5 秒後に再接続を試みます。その方法がわかりません。

ここで私の例は app.py server.py のテストです。server.py はテスト用です。これは別の言語になります。

私が試したことを言うと:
1)app.pyを起動してserver.pyがダウンすると、app.pyは再試行しません。
2)app.pyがserver.pyに接続され、サーバーがダウンしてすぐにアップすると、TcpClientが再接続しますが、新しいインスタンスで他のメソッドを接続して文字列「send to tcp」を送信することはできません。server.py に接続します。これ以上の接続がない古いものだけです。
3)asyncio.async()代わりに使用するrun_until_complete()と、他のプロトコルからメソッドを呼び出すことができません。

app.py と server.py をここに置いたので、コピーして実行するだけでテストできます。

「send to tcp」という文字列を送信するために使用ncat localhost 9000 -u -vします。この文字列を UdpServer クラスに出力し、TcpClient クラスのメソッド send_data_to_tcp に渡す必要があります。このメソッドは文字列を server.py に送信します。<- これは、tcpClient の最初の再接続後は機能しません。

Python 3.4.0 を使用しています。

どうもありがとう。

app.py:

import asyncio

#TCP client
class TcpClient(asyncio.Protocol):
    message = 'Testing'

    def connection_made(self, transport):
        self.transport = transport
        self.transport.write(self.message.encode())
        print('data sent: {}'.format(self.message))
        server_udp[1].tcp_client_connected()


    def data_received(self, data):
        self.data = format(data.decode())
        print('data received: {}'.format(data.decode()))
        if self.data == 'Testing':
            server_udp[1].send_data_to_udp(self.data)

    def send_data_to_tcp(self, data):
        self.transport.write(data.encode())

    def connection_lost(self, exc):
        msg = 'Connection lost with the server...'
        info = self.transport.get_extra_info('peername')
        server_udp[1].tcp_client_disconnected(msg, info)


#UDP Server
class UdpServer(asyncio.DatagramProtocol):

    CLIENT_TCP_TIMEOUT = 5.0

    def __init__(self):
        self.client_tcp_timeout = None

    def connection_made(self, transport):
        print('start', transport)
        self.transport = transport

    def datagram_received(self, data, addr):
        self.data = data.strip()
        self.data = self.data.decode()
        print('Data received:', self.data, addr)
        if self.data == 'send to tcp.':
            client_tcp[1].send_data_to_tcp(self.data)

    def connection_lost(self, exc):
        print('stop', exc)

    def send_data_to_udp(self, data):
        print('Receiving on UDPServer Class: ', (data))

    def connect_client_tcp(self):
        coro = loop.create_connection(TcpClient, 'localhost', 8000)
        #client_tcp = loop.run_until_complete(coro)
        client_tcp = asyncio.async(coro)

    def tcp_client_disconnected(self, data, info):
        print(data)
        self.client_tcp_info = info
        self.client_tcp_timeout = asyncio.get_event_loop().call_later(self.CLIENT_TCP_TIMEOUT, self.connect_client_tcp)

    def tcp_client_connected(self):
        if self.client_tcp_timeout:
            self.client_tcp_timeout.cancel()
            print('call_later cancel.')

loop = asyncio.get_event_loop()

#UDP Server
coro = loop.create_datagram_endpoint(UdpServer, local_addr=('localhost', 9000)) 
#server_udp = asyncio.Task(coro)
server_udp = loop.run_until_complete(coro)


#TCP client
coro = loop.create_connection(TcpClient, 'localhost', 8000)
#client_tcp = asyncio.async(coro)
client_tcp = loop.run_until_complete(coro)

loop.run_forever()

サーバー.py:

import asyncio

class EchoServer(asyncio.Protocol):
    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('connection from {}'.format(peername))
        self.transport = transport

    def data_received(self, data):
        print('data received: {}'.format(data.decode()))
        self.transport.write(data)

        # close the socket
        #self.transport.close()

    #def connection_lost(self):
    #    print('server closed the connection')



loop = asyncio.get_event_loop()
coro = loop.create_server(EchoServer, 'localhost', 8000)
server = loop.run_until_complete(coro)
print(server)
print(dir(server))
print(dir(server.sockets))

print('serving on {}'.format(server.sockets[0].getsockname()))

try:
    loop.run_forever()
except KeyboardInterrupt:
    print("exit")
finally:
    server.close()
    loop.close()
4

1 に答える 1

13

本当にいくつかの小さな修正が必要です。まず、接続の再試行を処理するコルーチンを作成しました。

@asyncio.coroutine
def do_connect():
    global tcp_server  # Make sure we use the global tcp_server
    while True:
        try:
            tcp_server = yield from loop.create_connection(TcpClient, 
                                                           'localhost', 8000)
        except OSError:
            print("Server not up retrying in 5 seconds...")
            yield from asyncio.sleep(5)
        else:
            break

次に、これを使用してすべてを開始します。

loop = asyncio.get_event_loop()

#UDP Server
coro = loop.create_datagram_endpoint(UdpServer, local_addr=('localhost', 9000)) 
server_udp = loop.run_until_complete(coro)

#TCP client
loop.run_until_complete(do_connect())

loop.run_forever()

次の部分は、app.py の起動後にサーバーのダウン/復帰を処理することです。これを修正tcp_client_disconnectedconnect_client_tcpて適切に処理する必要があります。

def connect_client_tcp(self):
    global client_tcp
    task = asyncio.async(do_connect())
    def cb(result):
        client_tcp = result
    task.add_done_callback(cb)

def tcp_client_disconnected(self, data, info):
    print(data)
    self.client_tcp_info = info
    self.client_tcp_timeout = loop.call_later(self.CLIENT_TCP_TIMEOUT, self.connect_client_tcp)

興味深い作品はconnect_client_tcp. 元のバージョンには 2 つの問題がありました。

  1. client_tcpの結果に直接代入していました。これは、 に代入されたことasyncio.async(coro)を意味client_tcpasyncio.Taskます。それはあなたが望んでいたことではありません。client_tcpあなたは完成したの結果に割り当てられたかったasyncio.Task. 完了したら、 を使用して結果task.add_done_callbackに代入することで、これを実現します。client_tcpTask

  2. global client_tcpメソッドの先頭を忘れました。それがなければ、 という名前のローカル変数を作成しただけclient_tcpで、 の最後で破棄されていましたconnect_client_tcp

これらの問題が修正されると、必要なときにいつでも を実行しapp.pyたり、開始/停止したりできますが、3 つのコンポーネントがすべて一緒に実行されている場合は、 からにserv.py適切に配信されたすべてのメッセージが常に表示されます。ncatserv.py

app.py簡単にコピー/貼り付けできるように、完全な を次に示します。

import asyncio

#TCP client
class TcpClient(asyncio.Protocol):
    message = 'Testing'

    def connection_made(self, transport):
        self.transport = transport
        self.transport.write(self.message.encode())
        print('data sent: {}'.format(self.message))
        server_udp[1].tcp_client_connected()


    def data_received(self, data):
        self.data = format(data.decode())
        print('data received: {}'.format(data.decode()))
        if self.data == 'Testing':
            server_udp[1].send_data_to_udp(self.data)

    def send_data_to_tcp(self, data):
        self.transport.write(data.encode())

    def connection_lost(self, exc):
        msg = 'Connection lost with the server...'
        info = self.transport.get_extra_info('peername')
        server_udp[1].tcp_client_disconnected(msg, info)


#UDP Server
class UdpServer(asyncio.DatagramProtocol):

    CLIENT_TCP_TIMEOUT = 5.0

    def __init__(self):
        self.client_tcp_timeout = None

    def connection_made(self, transport):
        print('start', transport)
        self.transport = transport

    def datagram_received(self, data, addr):
        self.data = data.strip()
        self.data = self.data.decode()
        print('Data received:', self.data, addr)
        if self.data == 'send to tcp.':
            client_tcp[1].send_data_to_tcp(self.data)

    def connection_lost(self, exc):
        print('stop', exc)

    def send_data_to_udp(self, data):
        print('Receiving on UDPServer Class: ', (data))

    def connect_client_tcp(self):
        global client_tcp
        coro = loop.create_connection(TcpClient, 'localhost', 8000)
        task = asyncio.async(do_connect())
        def cb(result):
            client_tcp = result
        task.add_done_callback(cb)

    def tcp_client_disconnected(self, data, info):
        print(data)
        self.client_tcp_info = info
        self.client_tcp_timeout = loop.call_later(self.CLIENT_TCP_TIMEOUT, self.connect_client_tcp)

    def tcp_client_connected(self):
        if self.client_tcp_timeout:
            self.client_tcp_timeout.cancel()
            print('call_later cancel.')

@asyncio.coroutine
def do_connect():
    global client_tcp
    while True:
        try:
            client_tcp = yield from loop.create_connection(TcpClient, 'localhost', 8000)
        except OSError:
            print("Server not up retrying in 5 seconds...")
            yield from asyncio.sleep(1)
        else:
            break

loop = asyncio.get_event_loop()

#UDP Server
coro = loop.create_datagram_endpoint(UdpServer, local_addr=('localhost', 9000))
server_udp = loop.run_until_complete(coro)

#TCP client
loop.run_until_complete(do_connect())

loop.run_forever()
于 2014-09-23T21:00:56.717 に答える