2

Pyramid アプリケーションにワーカー プロセスのプールを持たせようとしています。これを使用して、ビューを停止させたくない CPU 集中型 (または長時間実行) のバックグラウンド タスクを実行できます。私が今持っているものは機能しますが、問題が 1 つあります。ウェイトレスが終了から終了した場合 (--reload で発生するように)、ワーカーは長続きし、停止するように通知する方法がわかりません。

編集:これは、Gunicornを使用している場合(またはファイルから実行している場合)は問題にならないようです。これはウェイトレスのバグでしょうか?

Edit2 : そうですね。または、Gunicorn はこれを別の方法で処理して、見栄えを良くしています。

import multiprocessing as mp
from queue import Empty as QueueEmptyError

class MyPool(object):

    def __init__(self, processes=10, queue=None):
        self.jobqueue = queue if queue is not None else mp.Queue()
        self.procs = []
        self.running = True
        for i in range(processes):
            worker = mp.Process(target=self._worker, daemon=True)
            self.procs.append(worker)
            worker.start()

    def __del__(self):
        self.stopall()

    def stopall(self):
        self.running = False
        for worker in self.procs:
            worker.join()

    def _worker(self):
        while self.running:
            try:
                self._dojob(self.jobqueue.get(True, 1))
            except QueueEmptyError:
                pass

    def _dojob(self, taskdata):
        print(str(taskdata) + ' is happening')

class PoolMaster(object):
    def __init__(self):
        self.pools = []
        self.aqueue = mp.Queue()
        self.apool = MyPool(6, self.aqueue)
        self.pools.append(self.apool)

    def __del__(self):
        for pool in self.pools:
            pool.stopall()

    def do_something(self):
        self.aqueue.put_nowait('Something')

PoolMaster は、プロジェクトの main() 関数で一度インスタンス化され、すべてのイベントに追加することですべてのビューに公開されます。

私が以前に試したのは、「ポイズンピル」__del__が発生したときにキューに追加することでした__del__が、結局のところ、まったく呼び出されないようです。マルチプロセッシング独自のプールを使用したくありません。これは、キューで常に作業するのではなく、設定されたワークロードを一度実行するために作成されているように見えるためです。では、実際のアプリケーションが終了した後にそれらの実行を停止するにはどうすればよいでしょうか?

4

2 に答える 2

0

(たとえば、 )Pipe内の各ワーカーに を使用できます。次に、ワーカーは定期的に をリッスンして何らかのキャンセル コード (たとえば) を探します。ワーカー プロセスでパイプを使用するには、次のようにします。MyPoolcancelPipePipeTrue

connections = mp.Pipe()
self.cancelPipes.append(connections[0])
worker = mp.Process(target=self._worker, daemon=True, args=[connections[1]])

次に、全員をキャンセルしたい場合は、次のようにします。

for c in self.cancelPipes:
    c.send(True)

ワーカーは、これで停止する必要があるかどうかを確認します。

gotSomething = cancelPipe.poll()
if gotSomething:
    cancel = cancelPipe.recv()
    if cancel:
        # Stop your worker
于 2013-09-14T16:52:49.227 に答える