3

python、asyncio、およびasyncqpを使用して、複数のキューを同時に消費しようとしています。

asyncio.sleep()関数呼び出しが効果がない理由がわかりません。コードはそこで一時停止しません。公平を期すために、コールバックがどのコンテキストで実行されるか、および制御 bavck をイベント ループに渡すことができるかどうか (asyncio.sleep()呼び出しが意味を持つように) を実際には理解していません。

aiohttp.ClientSession.get()コールバック関数で関数呼び出しを使用する必要がある場合はどうすればよいprocess_msgですか? コルーチンではないのでできません。私の現在の asyncio の理解を超える方法が必要です。

#!/usr/bin/env python3

import asyncio
import asynqp


USERS = {'betty', 'bob', 'luis', 'tony'}


def process_msg(msg):
    asyncio.sleep(10)
    print('>> {}'.format(msg.body))
    msg.ack()

async def connect():
    connection = await asynqp.connect(host='dev_queue', virtual_host='asynqp_test')
    channel = await connection.open_channel()
    exchange = await channel.declare_exchange('inboxes', 'direct')

    # we have 10 users. Set up a queue for each of them
    # use different channels to avoid any interference
    # during message consumption, just in case.
    for username in USERS:
        user_channel = await connection.open_channel()
        queue = await user_channel.declare_queue('Inbox_{}'.format(username))
        await queue.bind(exchange, routing_key=username)
        await queue.consume(process_msg)

    # deliver 10 messages to each user
    for username in USERS:
        for msg_idx in range(10):
            msg = asynqp.Message('Msg #{} for {}'.format(msg_idx, username))
            exchange.publish(msg, routing_key=username)


loop = asyncio.get_event_loop()
loop.run_until_complete(connect())
loop.run_forever()
4

2 に答える 2

3

asyncio.sleep() 関数呼び出しが効果がない理由がわかりません。

イベント ループ (またはセマンティクス)asyncio.sleep()と組み合わせて使用​​する必要がある未来のオブジェクトを返すためです。async/await

コールバックは、ボンネットの下のイベントループに接続されているコンテキストの外側で呼び出されるため、await単純な宣言では使用できません。つまり、コールバック スタイルとスタイルを混在させるのは非常に難しいことです。defasync/awaitasync/await

ただし、簡単な解決策は、作業をイベント ループに戻すようにスケジュールすることです。

async def process_msg(msg):
    await asyncio.sleep(10)
    print('>> {}'.format(msg.body))
    msg.ack()

def _process_msg(msg):
    loop = asyncio.get_event_loop()
    loop.create_task(process_msg(msg))
    # or if loop is always the same one single line is enough
    # asyncio.ensure_future(process_msg(msg))

# some code
await queue.consume(_process_msg)

function には再帰がないことに注意してください_process_msg。つまり、 の本体はprocess_msgwhile in では実行されません_process_msgprocess_msgコントロールがイベントループに戻ると、内部関数が呼び出されます。

これは、次のコードで一般化できます。

def async_to_callback(coro):
    def callback(*args, **kwargs):
        asyncio.ensure_future(coro(*args, **kwargs))
    return callback

async def process_msg(msg):
    # the body

# some code
await queue.consume(async_to_callback(process_msg))
于 2016-08-11T14:34:21.357 に答える
1

解決策については、github でDrizzt1991 の応答を参照してください。

于 2016-08-11T14:00:04.963 に答える