6

task_postrun シグナルで raisin SystemExit() によってメインのセロリ プロセスをシャットダウンしようとしています。シグナルは問題なく発生し、例外が発生しますが、ワーカーが完全に終了することはなく、そこでハングします。

これを機能させるにはどうすればよいですか?

どこかで設定を忘れていませんか?

以下は、ワーカー (worker.py) に使用しているコードです。

from celery import Celery
from celery import signals

app = Celery('tasks',
    set_as_current = True,
    broker='amqp://guest@localhost//',
    backend="mongodb://localhost//",
)

app.config_from_object({
    "CELERYD_MAX_TASKS_PER_CHILD": 1,
    "CELERYD_POOL": "solo",
    "CELERY_SEND_EVENTS": True,
    "CELERYD_CONCURRENCY": 1,
    "CELERYD_PREFETCH_MULTIPLIER": 1,
})

def shutdown_worker(**kwargs):
    print("SHUTTING DOWN WORKER (RAISING SystemExit)")
    raise SystemExit()

import tasks

signals.task_postrun.connect(shutdown_worker)

print("STARTING WORKER")
app.worker_main()
print("WORKER EXITED!")

以下は、tasks.py のコードです。

from celery import Celery,task
from celery.signals import task_postrun
import time

from celery.task import Task

class Blah(Task):
    track_started = True
    acks_late = False

    def run(config, kwargs):
        time.sleep(5)
        return "SUCCESS FROM BLAH"

def get_app():
    celery = Celery('tasks',
        broker='amqp://guest@localhost//',
        backend="mongodb://localhost//"
    )
    return celery

これをテストするには、まずワーカー コード ( python worker.py) を実行し、次のようにタスクをキューに入れます。

>>> import tasks
>>> results = []
>>> results.append(tasks.Blah.delay({}))

ワーカーからの出力は次のとおりです。

STARTING WORKER

 -------------- celery@hostname v3.0.9 (Chiastic Slide)
---- **** ----- 
--- * ***  * -- [Configuration]
-- * - **** --- . broker:      amqp://guest@localhost:5672//
- ** ---------- . app:         tasks:0x26f3050
- ** ---------- . concurrency: 1 (solo)
- ** ---------- . events:      ON
- ** ---------- 
- *** --- * --- [Queues]
-- ******* ---- . celery:      exchange:celery(direct) binding:celery
--- ***** ----- 

[2012-11-06 15:59:16,761: WARNING/MainProcess] celery@hostname has started.
[2012-11-06 15:59:21,785: WARNING/MainProcess] SHUTTING DOWN WORKER (RAISING SystemExit)

Pythonコードが呼び出しから戻ってからapp.worker_main()印刷さWORKER EXITEDれ、プロセスが完全に終了することを期待していました。これは決して起こらず、それをなくすにはワーカー プロセスをkill -KILL {PID}編集する必要があります (他のタスクを消費することもできません。

私の主な質問は次のようになると思います:

からコードを返すにはどうすればよいapp.worker_main()ですか?

いくつかのタスクが実行された後、(プロセスを完全に終了させることによって)ワーカープロセスを完全に再起動できるようにしたいと考えています。X

UPDATEワーカーが何にハングアップしているかを把握しました-ワーカー(例外をキャッチした後WorkController、への呼び出しにハングself.stopSystemExit

4

1 に答える 1

2

自分の質問に答えてしまうのは嫌です。

Mediatorとにかく、それはコンポーネント内の結合呼び出しでブロックされていましたWorkController(コンポーネントの呼び出しstop()Mediator内部停止、それjoin)。

すべてのレート制限を無効にすることでMediatorコンポーネントを削除しました (デフォルトではこれになっているはずですが、何らかの理由でそうではありません)。

次の設定を使用して、すべてのレート制限を無効にすることができます。

CELERY_DISABLE_ALL_RATE_LIMITS: True

これが他の誰かにも役立つことを願っています。

平和

于 2012-11-07T14:30:55.393 に答える