22

Djangoプロジェクトがあり、Celeryを使用してバックグラウンド処理用のタスクを送信しようとしています(http://ask.github.com/celery/introduction.html)。CeleryはDjangoとうまく統合されており、カスタムタスクを送信して結果を返すことができました。

唯一の問題は、デーモンプロセスでカスタム初期化を実行する正しい方法が見つからないことです。タスクの処理を開始する前に、大量のメモリをロードする高価な関数を呼び出す必要があり、その関数を毎回呼び出す余裕はありません。

誰かが以前にこの問題を抱えたことはありますか?Celeryのソースコードを変更せずに回避する方法はありますか?

ありがとう

4

1 に答える 1

21

カスタムローダーを作成するか、シグナルを使用することができます。

ローダーにはon_task_init、タスクが実行されようとしているときにon_worker_init呼び出され、celery+celerybeatメインプロセスによって呼び出されるメソッドがあります。

シグナルの使用はおそらく最も簡単です。利用可能なシグナルは次のとおりです。

0.8.x:

  • task_prerun(task_id, task, args, kwargs)

    タスクがワーカーによって実行されようとしているときにディスパッチされます(または、apply/またはCELERY_ALWAYS_EAGER設定されている場合はローカルで)。

  • task_postrun(task_id, task, args, kwargs, retval) 上記と同じ条件でタスクが実行された後にディスパッチされます。

  • task_sent(task_id, task, args, kwargs, eta, taskset)

    タスクが適用されたときに呼び出されます(長時間実行される操作には適していません)

0.9.x(githubの現在のマスターブランチ)で利用可能な追加のシグナル:

  • worker_init()

    celerydの開始時に呼び出されます(タスクが初期化される前。したがって、をサポートするシステムforkでは、メモリの変更はすべて子ワーカープロセスにコピーされます)。

  • worker_ready()

    celerydがタスクを受信できるときに呼び出されます。

  • worker_shutdown()

    celerydがシャットダウンしているときに呼び出されます。

これは、プロセスでタスクが最初に実行されるときに何かを事前計算する例です。

from celery.task import Task
from celery.registry import tasks
from celery.signals import task_prerun

_precalc_table = {}

class PowersOfTwo(Task):

    def run(self, x):
        if x in _precalc_table:
            return _precalc_table[x]
        else:
            return x ** 2
tasks.register(PowersOfTwo)


def _precalc_numbers(**kwargs):
    if not _precalc_table: # it's empty, so haven't been generated yet
        for i in range(1024):
            _precalc_table[i] = i ** 2


# need to use registered instance for sender argument.
task_prerun.connect(_precalc_numbers, sender=tasks[PowerOfTwo.name])

関数をすべてのタスクに対して実行する場合は、sender引数をスキップしてください。

于 2010-01-27T09:07:57.527 に答える