0

既存の機能をブロックすることなく、タイムアウト期間後に Tornado を停止させようとしています。Tornado の規則が欠けている可能性がありますが、spawn_callback、Task、または Thread を使用するかどうかは関係ありません。メイン ループを停止しているようです。

まず、私がこれを行っている理由は、クライアント アプリで世界的に有名な NATS メッセージ バスを使用してメッセージを発行し (通常の単純な HTTP 機能ではありません)、サブスクライブされた応答を待ちたいからです。非同期動作の典型的な問題であり、公式の NATS Python クライアントは Tornado を使用しているため、私もそれを使用しようとしています。

私の問題は、 tornado.gen.coroutine デコレータがスレッドでどのように機能するかを理解することに関係していると思われます。

私のコードのクリップを以下に示します。誰かが私の明らかな問題に気づいたら、ポインタをいただければ幸いです。ありがとう!

class Delayed(Thread):
   def __init__(self, callback=None, timeout=2, *args, **kwargs):
        super(Delayed, self).__init__(*args, **kwargs)
        self.callback = callback
        self.timeout = timeout

   def run(self):
        time.sleep(self.timeout)
        if self.callback != None:
            self.callback()

def timeout_task(timeout_secs=2):
    time.sleep(timeout_secs)
    ioloop.IOLoop.instance().stop()
    yield

@tornado.gen.coroutine
def main():
    parser = argparse.ArgumentParser()

    parser.add_argument('CommandType')
    ...
    parser.add_argument('-s', '--servers', default=[], action='append')
    args = parser.parse_args()

    try:
      timeout=args.wait

      servers = args.servers
      queue = ""
      ...
      if len(args.servers) < 1:
          servers = ["nats://127.0.0.1:4222"]

      data = funct_to_get_data()

      nc = NATS()
      opts = { "servers": servers }
      yield nc.connect(**opts)

      def self_stop():
          ioloop.IOLoop.instance().stop()

      def handler(msg):
          print("[Received: {0}] {1}".format(msg.subject, msg.data))

      print("Subscribed to '{0}'".format(subject))
      future = nc.subscribe(subject, queue, handler)
      sid = future.result()

      yield nc.publish(subject, data)
      yield nc.flush()
      print("Published to '{0}'".format(subject))

      # HERE is where I'd like to setup a non-blocking timeout that
      # will stop Tornado.

      # spawn_callback blocks and prevents future from receiving anything.
      #ioloop.IOLoop.current().spawn_callback(lambda: timeout_task(timeout))

      # Task blocks and prevents future from receiving anything. 
      yield tornado.gen.Task(timeout_task, timeout)

      # Straight attempt at a Thread will block as well.
      Delayed(self_stop(), timeout).start()
    except Exception, e:
      print(e)
      show_usage_and_die()

if __name__ == '__main__':
    main()
    ioloop.IOLoop.instance().start()
4

1 に答える 1