multiprocessing.Queue
で使用できるオブジェクトの実装を次に示しますasyncio
。およびメソッドをmultiprocessing.Queue
追加して、インターフェイス全体を提供します。これらのメソッドは、非同期的にキューからの取得/キューへの書き込みに使用できます。実装の詳細は、私の他の回答の 2 番目の例と本質的に同じです: get/put を非同期にするために使用され、プロセス間でキューを共有するために a が使用されます。唯一の追加のトリックは、インスタンス変数として非 picklable を使用しているにもかかわらず、オブジェクトを picklable に保つように実装することです。coro_get
coro_put
asyncio.coroutine
ThreadPoolExecutor
multiprocessing.managers.SyncManager.Queue
__getstate__
ThreadPoolExecutor
from multiprocessing import Manager, cpu_count
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
def AsyncProcessQueue(maxsize=0):
m = Manager()
q = m.Queue(maxsize=maxsize)
return _ProcQueue(q)
class _ProcQueue(object):
def __init__(self, q):
self._queue = q
self._real_executor = None
self._cancelled_join = False
@property
def _executor(self):
if not self._real_executor:
self._real_executor = ThreadPoolExecutor(max_workers=cpu_count())
return self._real_executor
def __getstate__(self):
self_dict = self.__dict__
self_dict['_real_executor'] = None
return self_dict
def __getattr__(self, name):
if name in ['qsize', 'empty', 'full', 'put', 'put_nowait',
'get', 'get_nowait', 'close']:
return getattr(self._queue, name)
else:
raise AttributeError("'%s' object has no attribute '%s'" %
(self.__class__.__name__, name))
@asyncio.coroutine
def coro_put(self, item):
loop = asyncio.get_event_loop()
return (yield from loop.run_in_executor(self._executor, self.put, item))
@asyncio.coroutine
def coro_get(self):
loop = asyncio.get_event_loop()
return (yield from loop.run_in_executor(self._executor, self.get))
def cancel_join_thread(self):
self._cancelled_join = True
self._queue.cancel_join_thread()
def join_thread(self):
self._queue.join_thread()
if self._real_executor and not self._cancelled_join:
self._real_executor.shutdown()
@asyncio.coroutine
def _do_coro_proc_work(q, stuff, stuff2):
ok = stuff + stuff2
print("Passing %s to parent" % ok)
yield from q.coro_put(ok) # Non-blocking
item = q.get() # Can be used with the normal blocking API, too
print("got %s back from parent" % item)
def do_coro_proc_work(q, stuff, stuff2):
loop = asyncio.get_event_loop()
loop.run_until_complete(_do_coro_proc_work(q, stuff, stuff2))
@asyncio.coroutine
def do_work(q):
loop.run_in_executor(ProcessPoolExecutor(max_workers=1),
do_coro_proc_work, q, 1, 2)
item = yield from q.coro_get()
print("Got %s from worker" % item)
item = item + 25
q.put(item)
if __name__ == "__main__":
q = AsyncProcessQueue()
loop = asyncio.get_event_loop()
loop.run_until_complete(do_work(q))
出力:
Passing 3 to parent
Got 3 from worker
got 28 back from parent
AsyncProcessQueue
ご覧のとおり、親プロセスまたは子プロセスから、同期および非同期の両方で を使用できます。グローバルな状態を必要とせず、複雑さのほとんどをクラスにカプセル化することで、元の答えよりもエレガントに使用できます。
おそらく、ソケットを直接使用してパフォーマンスを向上させることができますが、クロスプラットフォームの方法で動作させるのはかなり難しいようです. これには、複数のワーカー間で使用できるという利点もあり、自分でピクル/ピクル解除する必要がありません。