3

これが私のコードです。この他の質問がやろうとしていることと非常によく似たものを行うことになっています。特に、この図は関連しています: プロセス図 f1 = 生産、f2 = f3 = 労働者、f4 = 消費者。

私はまだすべてをうまく終わらせるという問題に取り組もうとはしていません。それはこの質問の目的ではありません。「RuntimeError: Queue objects should be shared only between processes through inheritance」というエラーが表示されます。これを修正する方法がわかりません。Go のチャネルのような関数にキューを渡したいだけです。これがコードです。

import multiprocessing


def produce(n, queue):
    for i in xrange(n):
        queue.put(i)

def worker(in_queue, out_queue):
    for i in iter( in_queue.get, None):
        out_queue.put(i*i)

def consumer(queue):
    ans = []
    for i in iter( queue.get, None):
        ans.append(i)
    return ans


def main(n):
    pool = multiprocessing.Pool(4)
    in_queue = multiprocessing.Queue()
    out_queue = multiprocessing.Queue()

    pool.apply_async(produce, (n, in_queue))
    for i in range(2):
        pool.apply_async(worker, (in_queue, out_queue))
    result = consumer(out_queue)
    pool.close()
    pool.join()
    return result

main(200)

どうすれば修正できますか?

それを行う簡単な方法はありますか?

試してみPool.mapましたが、これを機能させたいと思います。

4

2 に答える 2

2

は、必要なmultiprocessing.PoolIPC メカニズムを既に設定しており、開始後にワーカーにジョブを送信できるようになっていますが、後で引数として Queue などを渡すことはできません。それがあなたのコードが機能しない理由です。サブプロセスが開始された時点で、サブプロセスはその親と通信する方法を既に知っている必要があります。

したがって、独自のキューを設定する必要がある場合は、multiprocessing.Process直接使用する必要があります。また、あなたが書いているのは、新しいジョブをループで待って処理する典型的なワーカーです。ワーカーのプールでそのようなワーカーを実行することは、やりたいことではありません。

このようにあなたのコードは動作します:

import multiprocessing


def produce(n, queue):
    for i in xrange(n):
        queue.put(i)

def worker(in_queue, out_queue):
    for i in iter( in_queue.get, None):
        out_queue.put(i*i)

def consumer(queue):
    ans = []
    for i in iter( queue.get, None):
        print(i)
        ans.append(i)
    return ans


def main(n):
    in_queue = multiprocessing.Queue()
    out_queue = multiprocessing.Queue()
    producer = multiprocessing.Process(target=produce, args=(n, in_queue))
    for i in range(2):
        w = multiprocessing.Process(target=worker, args=(in_queue, out_queue))
        w.start()
    producer.start()
    res = consumer(out_queue)

main(200)

consumer何かが起こっていることを示すために、print ステートメントを追加しました。キューから読み取るコードは、ワーカーもプロデューサーもキューに入れたことがないため、決して来ないconsumer終了を待機するため、関数は決して終了しません...None

于 2013-06-25T19:36:47.683 に答える