13

asyncio.Protocol.data_received新しい Python asyncioモジュールのコールバックで非同期処理を行う際に問題が発生しています。

次のサーバーを検討してください。

class MathServer(asyncio.Protocol):

   @asyncio.coroutine
   def slow_sqrt(self, x):
      yield from asyncio.sleep(1)
      return math.sqrt(x)

   def fast_sqrt(self, x):
      return math.sqrt(x)

   def connection_made(self, transport):
      self.transport = transport

   #@asyncio.coroutine
   def data_received(self, data):
      print('data received: {}'.format(data.decode()))
      x = json.loads(data.decode())
      #res = self.fast_sqrt(x)
      res = yield from self.slow_sqrt(x)
      self.transport.write(json.dumps(res).encode('utf8'))
      self.transport.close()

次のクライアントで使用:

class MathClient(asyncio.Protocol):

   def connection_made(self, transport):
      transport.write(json.dumps(2.).encode('utf8'))

   def data_received(self, data):
      print('data received: {}'.format(data.decode()))

   def connection_lost(self, exc):
      asyncio.get_event_loop().stop()

self.fast_sqrt呼び出されると、すべてが期待どおりに機能します。

ではself.slow_sqrt、動作しません。

また、self.fast_sqrtおよび の@asyncio.coroutineデコレータでは機能しませんdata_received

ここで根本的な何かが欠けているように感じます。

完全なコードは次のとおりです。

テスト済み:

  • Python 3.4.0b1 (Windows)
  • Python 3.3.3 + asyncio-0.2.1 (FreeBSD)

問題は両方で同じです。 をslow_sqrt使用すると、クライアント/サーバーは何もせずにハングします。

4

2 に答える 2

10

これは a を介して分離する必要があるようですFutureが、これが正しい方法であるかどうかはまだわかりません。

class MathServer(asyncio.Protocol):

   @asyncio.coroutine
   def slow_sqrt(self, x):
      yield from asyncio.sleep(2)
      return math.sqrt(x)

   def fast_sqrt(self, x):
      return math.sqrt(x)

   def consume(self):
      while True:
         self.waiter = asyncio.Future()
         yield from self.waiter
         while len(self.receive_queue):
            data = self.receive_queue.popleft()
            if self.transport:
               try:
                  res = self.process(data)
                  if isinstance(res, asyncio.Future) or \
                     inspect.isgenerator(res):
                     res = yield from res
               except Exception as e:
                  print(e)

   def connection_made(self, transport):
      self.transport = transport
      self.receive_queue = deque()
      asyncio.Task(self.consume())

   def data_received(self, data):
      self.receive_queue.append(data)
      if not self.waiter.done():
         self.waiter.set_result(None)
      print("data_received {} {}".format(len(data), len(self.receive_queue)))

   def process(self, data):
      x = json.loads(data.decode())
      #res = self.fast_sqrt(x)
      res = yield from self.slow_sqrt(x)
      self.transport.write(json.dumps(res).encode('utf8'))
      #self.transport.close()

   def connection_lost(self, exc):
      self.transport = None

Guido van Rossumによる回答は次のとおりです。

解決策は簡単です。そのロジックを でマークされた別のメソッドとして記述し、 (この場合は )を使用し @coroutineて起動します。これがプロトコルに組み込まれていない理由は、組み込まれている場合、コルーチンを処理するために別のイベント ループの実装が必要になるためです。data_received()async()== Task()

def data_received(self, data):
    asyncio.ensure_future(self.process_data(data))

@asyncio.coroutine
def process_data(self, data):
    # ...stuff using yield from...

完全なコードはこちら: -クライアント -サーバー

于 2013-12-23T17:45:50.700 に答える