3

Python ドキュメントのコード例を次に示します。

def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

q = Queue()
for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()       # block until all tasks are done

次のようなユースケースに合わせて変更しました。

import threading
from Queue import Queue

max_threads = 10

q = Queue(maxsize=max_threads + 2)

def worker():
  while True:
    task = q.get(1)
    # do something with the task
    q.task_done()

for i in range(max_threads):
  t = threading.Thread(target=worker)
  t.start()

for task in ['a', 'b', 'c']:
  q.put(task)

q.join()

実行すると、デバッガーはすべてのジョブが実行されたと表示しますが、q.join() は永遠に待機しているようです。すでにすべてのタスクを送信したというシグナルをワーカー スレッドに送信するにはどうすればよいですか?

4

3 に答える 3

2

.join()ワーカー スレッドが新しいキュー データを待機し続けるため、このプロセスは で終了しません(ブロッキング.get())

これは単純なフラグfinishUpを使用してワーカーに終了を伝えるメソッドです。これは、.join()すべてのタスクが処理されたことを意味します。呼び出しにタイムアウトを追加して、フラグq.get()をチェックできるようにしましたfinishUp

import threading
import queue

max_threads = 5
q = queue.Queue(maxsize=max_threads + 2)
finishUp = False

def worker():
    while True:
        try:
            task = q.get(block=True, timeout=1)
            # do something with the task
            print ("processing task for:"+str(task))
            q.task_done()
        except Exception as ex: # we get this exception when queue is empty
            if finishUp:
                print ("thread finishing because processing is done")
                return

for i in range(max_threads):
  t = threading.Thread(target=worker)
  t.start()

for task in ['a', 'b', 'c']:
  q.put(task)

print ("waiting on join")
q.join()
finishUp = True  # let the workers know that they can exit
print ("finished")

これにより、次の出力が生成されます。

waiting on join
processing task for:a
processing task for:b
processing task for:c
finished
thread finishing because processing is done
thread finishing because processing is done
thread finishing because processing is done
thread finishing because processing is done
thread finishing because processing is done

Process finished with exit code 0
于 2019-01-05T19:59:56.330 に答える
1

DONE作業の終了を通知するオブジェクトを定義しました。

DONE = object()

上位レベルがそれ以上データが来ないことを知っているとき、文字通りそれをキューに入れます:

q.put_nowait(DONE)

ワーカー スレッドでは、オブジェクトが受信されるとすぐにスレッドが終了します。しかし、まったく同じキューでリッスンしている他のスレッドがある場合は、オブジェクトをキューに戻す必要があります。

item = q.get()
if item is DONE:
    q.put_nowait(DONE)
    return

乾杯 :)

于 2020-08-06T01:25:37.547 に答える