python、asyncio、およびasyncqpを使用して、複数のキューを同時に消費しようとしています。
asyncio.sleep()
関数呼び出しが効果がない理由がわかりません。コードはそこで一時停止しません。公平を期すために、コールバックがどのコンテキストで実行されるか、および制御 bavck をイベント ループに渡すことができるかどうか (asyncio.sleep()
呼び出しが意味を持つように) を実際には理解していません。
aiohttp.ClientSession.get()
コールバック関数で関数呼び出しを使用する必要がある場合はどうすればよいprocess_msg
ですか? コルーチンではないのでできません。私の現在の asyncio の理解を超える方法が必要です。
#!/usr/bin/env python3
import asyncio
import asynqp
USERS = {'betty', 'bob', 'luis', 'tony'}
def process_msg(msg):
asyncio.sleep(10)
print('>> {}'.format(msg.body))
msg.ack()
async def connect():
connection = await asynqp.connect(host='dev_queue', virtual_host='asynqp_test')
channel = await connection.open_channel()
exchange = await channel.declare_exchange('inboxes', 'direct')
# we have 10 users. Set up a queue for each of them
# use different channels to avoid any interference
# during message consumption, just in case.
for username in USERS:
user_channel = await connection.open_channel()
queue = await user_channel.declare_queue('Inbox_{}'.format(username))
await queue.bind(exchange, routing_key=username)
await queue.consume(process_msg)
# deliver 10 messages to each user
for username in USERS:
for msg_idx in range(10):
msg = asynqp.Message('Msg #{} for {}'.format(msg_idx, username))
exchange.publish(msg, routing_key=username)
loop = asyncio.get_event_loop()
loop.run_until_complete(connect())
loop.run_forever()