36

Python のマルチプロセッシング モジュールに関するさまざまなチュートリアルを読んでいますが、なぜ/いつ呼び出すのか理解できませんprocess.join()。たとえば、次の例に出くわしました。

nums = range(100000)
nprocs = 4

def worker(nums, out_q):
    """ The worker function, invoked in a process. 'nums' is a
        list of numbers to factor. The results are placed in
        a dictionary that's pushed to a queue.
    """
    outdict = {}
    for n in nums:
        outdict[n] = factorize_naive(n)
    out_q.put(outdict)

# Each process will get 'chunksize' nums and a queue to put his out
# dict into
out_q = Queue()
chunksize = int(math.ceil(len(nums) / float(nprocs)))
procs = []

for i in range(nprocs):
    p = multiprocessing.Process(
            target=worker,
            args=(nums[chunksize * i:chunksize * (i + 1)],
                  out_q))
    procs.append(p)
    p.start()

# Collect all results into a single result dict. We know how many dicts
# with results to expect.
resultdict = {}
for i in range(nprocs):
    resultdict.update(out_q.get())

# Wait for all worker processes to finish
for p in procs:
    p.join()

print resultdict

私が理解していることからprocess.join()、 join メソッドが呼び出されたプロセスの実行が完了するまで、呼び出しプロセスをブロックします。また、上記のコード例で開始された子プロセスは、ターゲット関数の完了時に実行を完了すると考えています。つまり、結果を にプッシュした後out_qです。最後に、out_q.get()プルする結果が得られるまで呼び出しプロセスをブロックすると思います。したがって、コードを検討すると、次のようになります。

resultdict = {}
for i in range(nprocs):
    resultdict.update(out_q.get())

# Wait for all worker processes to finish
for p in procs:
    p.join()

メイン プロセスは、すべてのワーカー プロセスがその結果をキューにプッシュし終わるout_q.get()まで、呼び出しによってブロックされます。したがって、メイン プロセスが for ループを終了するまでに、各子プロセスは実行を完了しているはずですよね?

その場合、p.join()この時点でメソッドを呼び出す理由はありますか? すべてのワーカー プロセスがまだ終了していない場合、メイン プロセスはどのようにして「すべてのワーカー プロセスが終了するのを待つ」ようになるのでしょうか? 私が質問する主な理由は、これを複数の異なる例で見たことがあるためです。何かを理解できていないかどうか知りたいです。

4

3 に答える 3

22

を呼び出す直前の時点でjoin、すべてのワーカーが結果をキューに入れていますが、必ずしも戻ってくるとは限らず、プロセスがまだ終了していない可能性があります。タイミングによっては、そうする場合としない場合があります。

呼び出しjoinにより、すべてのプロセスに適切に終了する時間が与えられます。

于 2013-01-20T22:04:59.907 に答える
21

これを実行してみてください:

import math
import time
from multiprocessing import Queue
import multiprocessing

def factorize_naive(n):
    factors = []
    for div in range(2, int(n**.5)+1):
        while not n % div:
            factors.append(div)
            n //= div
    if n != 1:
        factors.append(n)
    return factors

nums = range(100000)
nprocs = 4

def worker(nums, out_q):
    """ The worker function, invoked in a process. 'nums' is a
        list of numbers to factor. The results are placed in
        a dictionary that's pushed to a queue.
    """
    outdict = {}
    for n in nums:
        outdict[n] = factorize_naive(n)
    out_q.put(outdict)

# Each process will get 'chunksize' nums and a queue to put his out
# dict into
out_q = Queue()
chunksize = int(math.ceil(len(nums) / float(nprocs)))
procs = []

for i in range(nprocs):
    p = multiprocessing.Process(
            target=worker,
            args=(nums[chunksize * i:chunksize * (i + 1)],
                  out_q))
    procs.append(p)
    p.start()

# Collect all results into a single result dict. We know how many dicts
# with results to expect.
resultdict = {}
for i in range(nprocs):
    resultdict.update(out_q.get())

time.sleep(5)

# Wait for all worker processes to finish
for p in procs:
    p.join()

print resultdict

time.sleep(15)

そしてタスクマネージャーを開きます。OS によって終了される前に、4 つのサブプロセスが数秒間ゾンビ状態になることがわかるはずです (join 呼び出しのため)。

ここに画像の説明を入力

より複雑な状況では、子プロセスは永久にゾンビ状態のままになる可能性があり (他の質問で尋ねていた状況のように)、十分な数の子プロセスを作成すると、プロセス テーブルがいっぱいになり、OS に問題が発生する可能性があります (これにより、強制終了される可能性があります)。失敗を避けるためのメイン プロセス)。

于 2013-01-20T22:28:38.443 に答える