1

ここの公式ドキュメントには、次の例が示されています。

def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

q = Queue()
for i in range(num_worker_threads):
    t = Thread(target=worker)
    t.daemon = True
    t.start()

for item in source():
    q.put(item)

q.join()       # block until all tasks are done

メインスレッドが続行する前に、この時点ですべてのスレッドが強制終了されていることを確認したいと考えています。キュー内のすべてのタスクが処理された後、q.get() メソッドで例外が発生し、スレッドが強制終了されると思います。あれは正しいですか?

4

2 に答える 2

2

いいえ。キューにアイテムがない場合、getデフォルトでは、アイテムがキューに入れられるのを待ちます。アイテムがなくなったときに例外を発生させたい場合は、それを渡すか、 をblock=False使用しますget_nowait

non-blocking を使用すると、getすべてが機能するはずですが、通常のケースで例外が発生したためにスレッドが停止することはあまりありません。ブロックで囲むことをお勧めtryします。キューが空であるために例外がスローされた場合は、スレッドをきれいに停止します。

try:
    item = q.get(block=False)
except queue.Empty:
    return
于 2013-02-10T02:41:27.057 に答える
0

do_work()の呼び出しのいずれかで例外が発生した場合、それを実行するスレッドは終了します。この場合、フォローされていない ため、メインスレッドは永久にブロックされます。q.join()q.get()q.task_done()

スレッド プールを使用して例を書き直すことができます。

from multiprocessing.dummy import Pool # use threads

p = Pool(num_worker_threads)    
for _ in p.imap_unordered(do_work, source()):
    pass
p.close()
p.join() # no threads after this point

この場合、 ifdo_work()は例外を発生させます。それはメインスレッドに伝播され、終了します (プールスレッドはデーモンであるため、プログラムの実行を維持しません)。

ベースのソリューションに代わる別の方法は、Queueセンチネル値をキューに入れ (スレッドごとに 1 つの値) worker()、センチネルが検出された場合に終了することです。:

STOP = object()

def worker(queue):
    for item in iter(queue.get, STOP): # until STOP is encountered
        do_work(item)

# instead of `q.join()`
for _ in threads: q.put(STOP)
for t in threads: t.join() # no threads after this point
于 2013-05-10T09:02:21.523 に答える