17

を介して起動される親プロセスから子プロセスにデータを渡すためにキューを使用したいと思いますmultiprocessing.Process。ただし、親プロセスは Python の新しいasyncioライブラリを使用するため、キュー メソッドはノンブロッキングである必要があります。私の知る限り、asyncio.Queueはタスク間通信用に作成されており、プロセス間通信には使用できません。また、メソッドとメソッドがあることmultiprocessing.Queueは知っていますが、実際には現在のタスクをブロックするコルーチンが必要です (プロセス全体ではありません)。/をラップするコルーチンを作成する方法はありますか? 別のメモとして、同じプロセスで実行されているイベント ループと内部的に互換性があるスレッドを使用していますか?put_nowait()get_nowait()put_nowait()get_nowait()multiprocessing.Queue

そうでない場合、他にどのようなオプションがありますか? 非同期ソケットを利用することでそのようなキューを自分で実装できることは知っていますが、それを回避できることを願っていました...

編集: ソケットの代わりにパイプを使用することも検討しましたが、asyncio互換性がないようですmultiprocessing.Pipe()。より正確には、ファイルのようなオブジェクトではないPipe()オブジェクトのタプルを返します。ただし、のメソッド/メソッドおよび/すべてはファイルのようなオブジェクトを想定しているため、そのような. 対照的に、パッケージがパイプとして使用する通常のファイルのようなオブジェクトはまったく問題なく、簡単に と組み合わせて使用​​できますConnectionasyncio.BaseEventLoopadd_reader()add_writer()connect_read_pipe()connect_write_pipe()Connectionsubprocessasyncio

更新: パイプ アプローチをもう少し調査することにしました。ファイル記述子を 経由で取得し、それを に渡すことで、 からConnection返されたオブジェクトmultiprocessing.Pipe()をファイルのようなオブジェクトに変換しました。最後に、結果のファイルのようなオブジェクトをイベント ループの/に渡しました。(誰かが正確なコードに興味がある場合は、関連する問題に関するメーリング リストの議論があります。) しかし、ストリームを ing すると、これを修正することができませんでした。また、 Windows のサポートが不足していることを考慮すると、これ以上追求するつもりはありません。fileno()os.fdopen()connect_read_pipe()connect_write_pipe()read()OSError: [Errno 9] Bad file descriptor

4

3 に答える 3

20

multiprocessing.Queueで使用できるオブジェクトの実装を次に示しますasyncio。およびメソッドをmultiprocessing.Queue追加して、インターフェイス全体を提供します。これらのメソッドは、非同期的にキューからの取得/キューへの書き込みに使用できます。実装の詳細は、私の他の回答の 2 番目の例と本質的に同じです: get/put を非同期にするために使用され、プロセス間でキューを共有するために a が使用されます。唯一の追加のトリックは、インスタンス変数として非 picklable を使用しているにもかかわらず、オブジェクトを picklable に保つように実装することです。coro_getcoro_putasyncio.coroutineThreadPoolExecutormultiprocessing.managers.SyncManager.Queue__getstate__ThreadPoolExecutor

from multiprocessing import Manager, cpu_count
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

def AsyncProcessQueue(maxsize=0):
    m = Manager()
    q = m.Queue(maxsize=maxsize)
    return _ProcQueue(q)   

class _ProcQueue(object):
    def __init__(self, q):
        self._queue = q
        self._real_executor = None
        self._cancelled_join = False

    @property
    def _executor(self):
        if not self._real_executor:
            self._real_executor = ThreadPoolExecutor(max_workers=cpu_count())
        return self._real_executor

    def __getstate__(self):
        self_dict = self.__dict__
        self_dict['_real_executor'] = None
        return self_dict

    def __getattr__(self, name):
        if name in ['qsize', 'empty', 'full', 'put', 'put_nowait',
                    'get', 'get_nowait', 'close']:
            return getattr(self._queue, name)
        else:
            raise AttributeError("'%s' object has no attribute '%s'" % 
                                    (self.__class__.__name__, name))

    @asyncio.coroutine
    def coro_put(self, item):
        loop = asyncio.get_event_loop()
        return (yield from loop.run_in_executor(self._executor, self.put, item))

    @asyncio.coroutine    
    def coro_get(self):
        loop = asyncio.get_event_loop()
        return (yield from loop.run_in_executor(self._executor, self.get))

    def cancel_join_thread(self):
        self._cancelled_join = True
        self._queue.cancel_join_thread()

    def join_thread(self):
        self._queue.join_thread()
        if self._real_executor and not self._cancelled_join:
            self._real_executor.shutdown()

@asyncio.coroutine
def _do_coro_proc_work(q, stuff, stuff2):
    ok = stuff + stuff2
    print("Passing %s to parent" % ok)
    yield from q.coro_put(ok)  # Non-blocking
    item = q.get() # Can be used with the normal blocking API, too
    print("got %s back from parent" % item)

def do_coro_proc_work(q, stuff, stuff2):
    loop = asyncio.get_event_loop()
    loop.run_until_complete(_do_coro_proc_work(q, stuff, stuff2))

@asyncio.coroutine
def do_work(q):
    loop.run_in_executor(ProcessPoolExecutor(max_workers=1),
                         do_coro_proc_work, q, 1, 2)
    item = yield from q.coro_get()
    print("Got %s from worker" % item)
    item = item + 25
    q.put(item)

if __name__  == "__main__":
    q = AsyncProcessQueue()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(do_work(q))

出力:

Passing 3 to parent
Got 3 from worker
got 28 back from parent

AsyncProcessQueueご覧のとおり、親プロセスまたは子プロセスから、同期および非同期の両方で を使用できます。グローバルな状態を必要とせず、複雑さのほとんどをクラスにカプセル化することで、元の答えよりもエレガントに使用できます。

おそらく、ソケットを直接使用してパフォーマンスを向上させることができますが、クロスプラットフォームの方法で動作させるのはかなり難しいようです. これには、複数のワーカー間で使用できるという利点もあり、自分でピクル/ピクル解除する必要がありません。

于 2014-07-11T19:17:04.503 に答える
5

残念ながら、このmultiprocessingライブラリは での使用には特に適していません。/asyncioの使用方法によっては、次のように完全に置き換えることができる場合があります。multiprocessingmultprocessing.Queueconcurrent.futures.ProcessPoolExecutor

import asyncio
from concurrent.futures import ProcessPoolExecutor


def do_proc_work(stuff, stuff2):  # This runs in a separate process
    return stuff + stuff2

@asyncio.coroutine
def do_work():
    out = yield from loop.run_in_executor(ProcessPoolExecutor(max_workers=1),
                                          do_proc_work, 1, 2)
    print(out)

if __name__  == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(do_work())

出力:

3

絶対に が必要な場合はmultiprocessing.Queue、 と組み合わせると問題なく動作するようですProcessPoolExecutor:

import asyncio
import time
import multiprocessing
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor


def do_proc_work(q, stuff, stuff2):
    ok = stuff + stuff2
    time.sleep(5) # Artificial delay to show that it's running asynchronously
    print("putting output in queue")
    q.put(ok)

@asyncio.coroutine
def async_get(q):
    """ Calls q.get() in a separate Thread. 

    q.get is an I/O call, so it should release the GIL.
    Ideally there would be a real non-blocking I/O-based 
    Queue.get call that could be used as a coroutine instead 
    of this, but I don't think one exists.

    """
    return (yield from loop.run_in_executor(ThreadPoolExecutor(max_workers=1), 
                                           q.get))

@asyncio.coroutine
def do_work(q):
    loop.run_in_executor(ProcessPoolExecutor(max_workers=1),
                         do_proc_work, q, 1, 2)
    coro = async_get(q) # You could do yield from here; I'm not just to show that it's asynchronous
    print("Getting queue result asynchronously")
    print((yield from coro))

if __name__  == "__main__":
    m = multiprocessing.Manager()
    q = m.Queue() # The queue must be inherited by our worker, it can't be explicitly passed in
    loop = asyncio.get_event_loop()
    loop.run_until_complete(do_work(q))

出力:

Getting queue result asynchronously
putting output in queue
3
于 2014-07-10T23:12:31.347 に答える