一度に実行されるタスクの数を制限するタスク キューを Python で作成しました。Queue.Queue
これは、キューに入れることができるアイテムの数を制限するのではなく、一度に取り出すことができるアイテムの数を制限するためとは少し異なります. これは引き続き unbounded を使用してジョブを実行しますが、スレッドの数を制限するためQueue.Queue
に a に依存しています。Semaphore
from Queue import Queue
from threading import BoundedSemaphore, Lock, Thread
class TaskQueue(object):
"""
Queues tasks to be run in separate threads and limits the number
concurrently running tasks.
"""
def __init__(self, limit):
"""Initializes a new instance of a TaskQueue."""
self.__semaphore = BoundedSemaphore(limit)
self.__queue = Queue()
self.__cancelled = False
self.__lock = Lock()
def enqueue(self, callback):
"""Indicates that the given callback should be ran."""
self.__queue.put(callback)
def start(self):
"""Tells the task queue to start running the queued tasks."""
thread = Thread(target=self.__process_items)
thread.start()
def stop(self):
self.__cancel()
# prevent blocking on a semaphore.acquire
self.__semaphore.release()
# prevent blocking on a Queue.get
self.__queue.put(lambda: None)
def __cancel(self):
print 'canceling'
with self.__lock:
self.__cancelled = True
def __process_items(self):
while True:
# see if the queue has been stopped before blocking on acquire
if self.__is_canceled():
break
self.__semaphore.acquire()
# see if the queue has been stopped before blocking on get
if self.__is_canceled():
break
callback = self.__queue.get()
# see if the queue has been stopped before running the task
if self.__is_canceled():
break
def runTask():
try:
callback()
finally:
self.__semaphore.release()
thread = Thread(target=runTask)
thread.start()
self.__queue.task_done()
def __is_canceled(self):
with self.__lock:
return self.__cancelled
タスク キューを明示的に停止しない限り、Python インタープリターは永久に実行されます。これは私が思っていたよりもはるかにトリッキーです。メソッドを見ると、キューにフラグ、セマフォ、ノーオペレーション コールバックstop
を設定していることがわかります。または でコードがブロックされる可能性があるため、最後の 2 つの部分が必要です。基本的に、ループが発生する可能性があるように、これらを強制的に通過させる必要があります。canceled
release
put
Semaphore
Queue
このコードは機能します。このクラスは、何千ものタスクを並行して実行しようとするサービスを実行する場合に役立ちます。マシンのスムーズな実行を維持し、アクティブなスレッドが多すぎることを OS が叫ぶのを防ぐために、このコードは同時に存在するスレッドの数を制限します。
以前に C# で同様のコードを書いたことがあります。そのコードを特定の切り詰めたものにしたのは、.NET には、CancellationToken
ほぼすべてのスレッド クラスが使用する a と呼ばれるものがあるためです。ブロッキング操作があるときはいつでも、その操作はオプションのトークンを取ります。親タスクがキャンセルされると、そのトークンでブロックされている子タスクもすぐにキャンセルされます。これは、セマフォを解放したり値をキューに入れたりして「偽装」するよりも、はるかにクリーンな終了方法のように思えます。
Pythonでこれを行う同等の方法があるかどうか疑問に思っていましたか? 非同期イベントのようなものではなく、スレッドを使用したいのは間違いありません。1 つが最大サイズで、もう 1 つが最大サイズでない2 つの s を使用して同じことを達成する方法があるかどうか疑問に思っていますが、Queue.Queue
キャンセルの処理方法はまだわかりません。