カスタムローダーを作成するか、シグナルを使用することができます。
ローダーにはon_task_init
、タスクが実行されようとしているときにon_worker_init
呼び出され、celery+celerybeatメインプロセスによって呼び出されるメソッドがあります。
シグナルの使用はおそらく最も簡単です。利用可能なシグナルは次のとおりです。
0.8.x:
task_prerun(task_id, task, args, kwargs)
タスクがワーカーによって実行されようとしているときにディスパッチされます(または、apply
/またはCELERY_ALWAYS_EAGER
設定されている場合はローカルで)。
task_postrun(task_id, task, args, kwargs, retval)
上記と同じ条件でタスクが実行された後にディスパッチされます。
task_sent(task_id, task, args, kwargs, eta, taskset)
タスクが適用されたときに呼び出されます(長時間実行される操作には適していません)
0.9.x(githubの現在のマスターブランチ)で利用可能な追加のシグナル:
worker_init()
celerydの開始時に呼び出されます(タスクが初期化される前。したがって、をサポートするシステムfork
では、メモリの変更はすべて子ワーカープロセスにコピーされます)。
worker_ready()
celerydがタスクを受信できるときに呼び出されます。
worker_shutdown()
celerydがシャットダウンしているときに呼び出されます。
これは、プロセスでタスクが最初に実行されるときに何かを事前計算する例です。
from celery.task import Task
from celery.registry import tasks
from celery.signals import task_prerun
_precalc_table = {}
class PowersOfTwo(Task):
def run(self, x):
if x in _precalc_table:
return _precalc_table[x]
else:
return x ** 2
tasks.register(PowersOfTwo)
def _precalc_numbers(**kwargs):
if not _precalc_table: # it's empty, so haven't been generated yet
for i in range(1024):
_precalc_table[i] = i ** 2
# need to use registered instance for sender argument.
task_prerun.connect(_precalc_numbers, sender=tasks[PowerOfTwo.name])
関数をすべてのタスクに対して実行する場合は、sender
引数をスキップしてください。