1

セロリのドキュメントのインスタンス化セクション ( http://celery.readthedocs.org/en/latest/userguide/tasks.html#custom-task-classes ) には、次のように記載されています。

タスクはリクエストごとにインスタンス化されるわけではありませんが、グローバル インスタンスとしてタスク レジストリに登録されます。

これは、init コンストラクターがプロセスごとに 1 回だけ呼び出されること、およびタスク クラスが意味的にアクタに近いことを意味します。

それにもかかわらず、次の例を実行すると、initメソッドが少なくとも 3 回呼び出されていることがわかります。セットアップの何が問題になっていますか? CELERYD_CONCURRENCY = 1ワーカーごとにプロセスが 1 つだけであることを確認する必要がありますよね?

$ celery -A proj beat

celery beat v3.1.17 (Cipater) is starting.
init Task1
40878160
x=1.0
init Task1
40878352
x=1.0
init Task1
40879312
x=1.0
__    -    ... __   -        _
Configuration ->
    . broker -> amqp://guest:**@localhost:5672//
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> celerybeat-schedule
    . logfile -> [stderr]@%INFO
    . maxinterval -> now (0s)
[2015-02-05 23:05:21,875: INFO/MainProcess] beat: Starting...
[2015-02-05 23:05:21,971: INFO/MainProcess] Scheduler: Sending due task    task1-every-5-seconds (proj.tasks.t1)
[2015-02-05 23:05:26,972: INFO/MainProcess] Scheduler: Sending due task task1-every-5-seconds (proj.tasks.t1)

セロリ.py:

from __future__ import absolute_import
from datetime import timedelta
from celery import Celery

app = Celery('proj',
             broker='amqp://guest@localhost//',
             backend='amqp://',
             include=['proj.tasks'])
app.conf.update(
    CELERY_REDIRECT_STDOUTS=True,
    CELERY_TASK_RESULT_EXPIRES=60,
    CELERYD_CONCURRENCY = 1,
    CELERYBEAT_SCHEDULE = {
        'task1-every-5-seconds': {
            'task': 'proj.tasks.t1',
            'schedule': timedelta(seconds=5)
            },
        },
    CELERY_TIMEZONE = 'GMT',
)

if __name__ == '__main__':
    app.start()

タスク.py:

from __future__ import absolute_import
from proj.celery import app
from celery import Task
import time

class Foo():
    def __init__(self, x):
        self.x = x

class Task1(Task):
    abstract = True
    def __init__(self):
        print "init Task1"
        print id(self)
        self.f = Foo(1.0)
        print "x=1.0"

@app.task(base=Task1)
def t1():
    t1.f.x +=1
    print t1.f.x
4

1 に答える 1

0

したがって、コメントによると、スレッドごとに 1 つの接続を維持する必要があります。

それでは、なぜスレッドストレージを使用しないのでしょうか? あなたの場合、それは安全な解決策でなければなりません。

from threading import local

thread_storage = local()

def get_or_create_conntection(*args, **kwargs):
    if not hasattr(thread_storage, 'connection'):
        thread_storage.connection = Connection(*args, **kwargs)
    return thread_storage.connection

@app.task()
def do_stuff():
    connection = get_or_create_connection('some', connection='args')
    connection.ping()
于 2015-02-05T08:02:40.973 に答える