16

私はdjangoとセロリ(django-celery)を使ったプロジェクトに取り組んでいます。私たちのチームは、すべてのデータ アクセス コードを(app-name)/manager.py(このように Manager にラップするのではなくdjango) ラップし、コードを (app-name)/task.py に配置して、セロリを使用したタスクのアセンブルと実行のみを処理することにしました (したがって、django はありません)。このレイヤーの ORM 依存関係)。

manager.pyの には、次のようなものがあります。

def get_tag(tag_name):
    ctype = ContentType.objects.get_for_model(Photo)
    try:
        tag = Tag.objects.get(name=tag_name)
    except ObjectDoesNotExist:
        return Tag.objects.none()
    return tag

def get_tagged_photos(tag):
    ctype = ContentType.objects.get_for_model(Photo)
    return TaggedItem.objects.filter(content_type__pk=ctype.pk, tag__pk=tag.pk)

def get_tagged_photos_count(tag):
    return get_tagged_photos(tag).count()

私の task.py では、それらをタスクにラップするのが好きです (その後、これらのタスクを使用してより複雑なタスクを実行する可能性があります)。そのため、次のデコレータを記述します。

import manager #the module within same app containing data access functions

class mfunc_to_task(object):
    def __init__(mfunc_type='get'):
        self.mfunc_type = mfunc_type

    def __call__(self, f):
        def wrapper_f(*args, **kwargs):
            callback = kwargs.pop('callback', None)

            mfunc = getattr(manager, f.__name__)

            result = mfunc(*args, **kwargs)
            if callback:
                if self.mfunc_type == 'get':
                    subtask(callback).delay(result)
                elif self.mfunc_type == 'get_or_create':
                    subtask(callback).delay(result[0])
                else:
                    subtask(callback).delay()
            return result            

        return wrapper_f

次に (まだtask.py):

#@task
@mfunc_to_task()
def get_tag():
    pass

#@task
@mfunc_to_task()
def get_tagged_photos():
    pass

#@task
@mfunc_to_task()
def get_tagged_photos_count():
    pass

なしでも問題なく動作し@taskます。しかし、その@taskデコレータを(セロリのドキュメントで指示されているように一番上に)適用した後、物事はバラバラになり始めます。どうやら、 が呼び出されるたびmfunc_to_task.__call__に、同じtask.get_tag関数が として渡されfます。wrapper_fそのため、毎回同じ結果になりましたが、今では、単一のタグを取得することしかできません。

私はデコレータが初めてです。ここで何がうまくいかなかったのかを理解するのを手伝ってくれる人、またはタスクを達成するための他の方法を指摘してくれる人はいますか? 私は、すべてのデータ アクセス関数に対して同じタスク ラップ コードを記述するのが本当に嫌いです。

4

2 に答える 2

20

引数を渡すことがうまくいかない理由がよくわかりませんか?

この例を使用する場合:

@task()
def add(x, y):
    return x + y

MyCoolTask​​ にログを追加します。

from celery import task
from celery.registry import tasks

import logging
import celery

logger = logging.getLogger(__name__)

class MyCoolTask(celery.Task):

    def __call__(self, *args, **kwargs):
        """In celery task this function call the run method, here you can
        set some environment variable before the run of the task"""
        logger.info("Starting to run")
        return self.run(*args, **kwargs)

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        #exit point of the task whatever is the state
        logger.info("Ending run")
        pass

拡張クラスを作成します (MyCoolTask​​ を拡張しますが、現在は引数付きです):

class AddTask(MyCoolTask):

    def run(self,x,y):
        if x and y:
            result=add(x,y)
            logger.info('result = %d' % result)
            return result
        else:
            logger.error('No x or y in arguments')

tasks.register(AddTask)

kwargs を json データとして渡すようにしてください。

{"x":8,"y":9}

私は結果を得る:

[2013-03-05 17:30:25,853: INFO/MainProcess] Starting to run
[2013-03-05 17:30:25,855: INFO/MainProcess] result = 17
[2013-03-05 17:30:26,739: INFO/MainProcess] Ending run
[2013-03-05 17:30:26,741: INFO/MainProcess] Task iamscheduler.tasks.AddTask[6a62641d-16a6-44b6-a1cf-7d4bdc8ea9e0] succeeded in 0.888684988022s: 17
于 2013-03-05T16:36:12.943 に答える
9

デコレータを使用する代わりに、拡張する基本クラスを作成しないのはなぜcelery.Taskですか?

このようにして、すべてのタスクはカスタマイズされたタスク クラスを拡張でき、メソッド__call__とを使用して個人の動作を実装できますafter_return 。すべてのタスクに共通のメソッドとオブジェクトを定義することもできます。

class MyCoolTask(celery.Task):

    def __call__(self, *args, **kwargs):
        """In celery task this function call the run method, here you can
        set some environment variable before the run of the task"""
        return self.run(*args, **kwargs)

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        #exit point of the task whatever is the state
        pass
于 2011-06-18T07:29:39.333 に答える