2

一度に実行されるタスクの数を制限するタスク キューを Python で作成しました。Queue.Queueこれは、キューに入れることができるアイテムの数を制限するのではなく、一度に取り出すことができるアイテムの数を制限するためとは少し異なります. これは引き続き unbounded を使用してジョブを実行しますが、スレッドの数を制限するためQueue.Queueに a に依存しています。Semaphore

from Queue import Queue
from threading import BoundedSemaphore, Lock, Thread


class TaskQueue(object):
    """
    Queues tasks to be run in separate threads and limits the number
    concurrently running tasks.

    """

    def __init__(self, limit):
        """Initializes a new instance of a TaskQueue."""
        self.__semaphore = BoundedSemaphore(limit)
        self.__queue = Queue()
        self.__cancelled = False
        self.__lock = Lock()

    def enqueue(self, callback):
        """Indicates that the given callback should be ran."""
        self.__queue.put(callback)

    def start(self):
        """Tells the task queue to start running the queued tasks."""
        thread = Thread(target=self.__process_items)
        thread.start()

    def stop(self):
        self.__cancel()
        # prevent blocking on a semaphore.acquire
        self.__semaphore.release()
        # prevent blocking on a Queue.get
        self.__queue.put(lambda: None)

    def __cancel(self):
        print 'canceling'
        with self.__lock:
            self.__cancelled = True

    def __process_items(self):
        while True:
            # see if the queue has been stopped before blocking on acquire
            if self.__is_canceled():
                break

            self.__semaphore.acquire()

            # see if the queue has been stopped before blocking on get
            if self.__is_canceled():
                break

            callback = self.__queue.get()

            # see if the queue has been stopped before running the task
            if self.__is_canceled():
                break

            def runTask():
                try:
                    callback()
                finally:
                    self.__semaphore.release()

            thread = Thread(target=runTask)
            thread.start()
            self.__queue.task_done()

    def __is_canceled(self):
        with self.__lock:
            return self.__cancelled

タスク キューを明示的に停止しない限り、Python インタープリターは永久に実行されます。これは私が思っていたよりもはるかにトリッキーです。メソッドを見ると、キューにフラグ、セマフォ、ノーオペレーション コールバックstopを設定していることがわかります。または でコードがブロックされる可能性があるため、最後の 2 つの部分が必要です。基本的に、ループが発生する可能性があるように、これらを強制的に通過させる必要があります。canceledreleaseputSemaphoreQueue

このコードは機能します。このクラスは、何千ものタスクを並行して実行しようとするサービスを実行する場合に役立ちます。マシンのスムーズな実行を維持し、アクティブなスレッドが多すぎることを OS が叫ぶのを防ぐために、このコードは同時に存在するスレッドの数を制限します。

以前に C# で同様のコードを書いたことがあります。そのコードを特定の切り詰めたものにしたのは、.NET には、CancellationTokenほぼすべてのスレッド クラスが使用する a と呼ばれるものがあるためです。ブロッキング操作があるときはいつでも、その操作はオプションのトークンを取ります。親タスクがキャンセルされると、そのトークンでブロックされている子タスクもすぐにキャンセルされます。これは、セマフォを解放したり値をキューに入れたりして「偽装」するよりも、はるかにクリーンな終了方法のように思えます。

Pythonでこれを行う同等の方法があるかどうか疑問に思っていましたか? 非同期イベントのようなものではなく、スレッドを使用したいのは間違いありません。1 つが最大サイズで、もう 1 つが最大サイズでない2 つの s を使用して同じことを達成する方法があるかどうか疑問に思っていますが、Queue.Queueキャンセルの処理方法はまだわかりません。

4

3 に答える 3

4

ポイズニングとを使用することで、コードを簡素化できると思いますThread.join()

from Queue import Queue
from threading import Thread

poison = object()

class TaskQueue(object):

    def __init__(self, limit):
        def process_items():
            while True:
                callback = self._queue.get()
                if callback is poison:
                    break
                try:
                    callback()
                except:
                    pass
                finally:
                    self._queue.task_done()
        self._workers = [Thread(target=process_items) for _ in range(limit)]
        self._queue = Queue()

    def enqueue(self, callback):
        self._queue.put(callback)

    def start(self):
        for worker in self._workers:
            worker.start()

    def stop(self):
        for worker in self._workers:
            self._queue.put(poison)
        while self._workers:
            self._workers.pop().join()

未テスト。

簡潔にするために、コメントを削除しました。

また、このバージョンprocess_items()では本当に非公開です。

ところで:Queueモジュールの要点は、恐ろしいロックやイベントから解放することです。

于 2012-09-17T02:52:09.577 に答える
1

キューからタスクごとに新しいスレッドを作成しているようです。これはそれ自体が無駄であり、スレッド数をどのように制限するかという問題にもつながります。

代わりに、一定数のワーカー スレッドを作成し、キューからタスクを自由にプルできるようにする方法が一般的です。キューをキャンセルするには、キューをクリアして、将来の作業を見越してワーカーを存続させます。

于 2012-09-16T19:02:16.820 に答える
0

Janne Karila のアドバイスを受けて、スレッド プールを作成しました。これにより、セマフォが不要になりました。問題は、キューがなくなると予想される場合は、ワーカー スレッドの実行を停止する必要があることです (以前に行ったことのバリエーションにすぎません)。新しいコードはかなり似ています:

class TaskQueue(object):
    """
    Queues tasks to be run in separate threads and limits the number
    concurrently running tasks.

    """

    def __init__(self, limit):
        """Initializes a new instance of a TaskQueue."""
        self.__workers = []
        for _ in range(limit):
            worker = Thread(target=self.__process_items)
            self.__workers.append(worker)
        self.__queue = Queue()
        self.__cancelled = False
        self.__lock = Lock()
        self.__event = Event()

    def enqueue(self, callback):
        """Indicates that the given callback should be ran."""
        self.__queue.put(callback)

    def start(self):
        """Tells the task queue to start running the queued tasks."""
        for worker in self.__workers:
            worker.start()

    def stop(self):
        """
        Stops the queue from processing anymore tasks. Any actively running
        tasks will run to completion.

        """
        self.__cancel()
        # prevent blocking on a Queue.get
        for _ in range(len(self.__workers)):
            self.__queue.put(lambda: None)
            self.__event.wait()

    def __cancel(self):
        with self.__lock:
            self.__queue.queue.clear()
            self.__cancelled = True

    def __process_items(self):
        while True:
            callback = self.__queue.get()

            # see if the queue has been stopped before running the task
            if self.__is_canceled():
                break

            try:
                callback()
            except:
                pass
            finally:
                self.__queue.task_done()
        self.__event.set()

    def __is_canceled(self):
        with self.__lock:
            return self.__cancelled

よく見ると、従業員を殺すために会計処理をしなければなりませんでした。私は基本的にEventワーカーの数だけ待ちます。Iclearは、ワーカーが他の方法でキャンセルされるのを防ぐための基になるキューです。また、各偽の値をキューに送り込んだ後も待機するため、一度に 1 つのワーカーだけがキャンセルできます。

これについていくつかのテストを実行しましたが、機能しているようです。偽の値の必要性を排除することは、依然として素晴らしいことです。

于 2012-09-17T02:12:46.290 に答える