直接使用するべきではありませんが、問題なくasyncio
安全に組み合わせることができるはずです。(およびその他のイベントループ ベースの非同期フレームワーク) の大罪は、イベント ループをブロックしています。直接使用しようとすると、子プロセスを待機するためにブロックするたびに、イベント ループがブロックされます。明らかに、これは悪いことです。multiprocessing
multiprocessing
asyncio
multiprocessing
これを回避する最も簡単な方法は、 を使用BaseEventLoop.run_in_executor
して関数を実行することconcurrent.futures.ProcessPoolExecutor
です。ProcessPoolExecutor
は、 を使用して実装されたプロセス プールですがmultiprocessing.Process
、asyncio
イベント ループをブロックすることなく関数を実行するためのサポートが組み込まれています。簡単な例を次に示します。
import time
import asyncio
from concurrent.futures import ProcessPoolExecutor
def blocking_func(x):
time.sleep(x) # Pretend this is expensive calculations
return x * 5
@asyncio.coroutine
def main():
#pool = multiprocessing.Pool()
#out = pool.apply(blocking_func, args=(10,)) # This blocks the event loop.
executor = ProcessPoolExecutor()
out = yield from loop.run_in_executor(executor, blocking_func, 10) # This does not
print(out)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
ほとんどの場合、これは機能だけで十分です。、、 、 などの他の構造が必要な場合multiprocessing
は、すべてのデータ構造の互換バージョンを提供する (完全開示: 私が書いた)というサードパーティ ライブラリがあります。これをデモする例を次に示します。Queue
Event
Manager
aioprocessing
asyncio
multiprocessing
import time
import asyncio
import aioprocessing
import multiprocessing
def func(queue, event, lock, items):
with lock:
event.set()
for item in items:
time.sleep(3)
queue.put(item+5)
queue.close()
@asyncio.coroutine
def example(queue, event, lock):
l = [1,2,3,4,5]
p = aioprocessing.AioProcess(target=func, args=(queue, event, lock, l))
p.start()
while True:
result = yield from queue.coro_get()
if result is None:
break
print("Got result {}".format(result))
yield from p.coro_join()
@asyncio.coroutine
def example2(queue, event, lock):
yield from event.coro_wait()
with (yield from lock):
yield from queue.coro_put(78)
yield from queue.coro_put(None) # Shut down the worker
if __name__ == "__main__":
loop = asyncio.get_event_loop()
queue = aioprocessing.AioQueue()
lock = aioprocessing.AioLock()
event = aioprocessing.AioEvent()
tasks = [
asyncio.async(example(queue, event, lock)),
asyncio.async(example2(queue, event, lock)),
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()