0

レイアウトを考えると:

background \
    tasks  \
        __init__.py
        generic.py
        helpers.py
    __init__.py
    _server.py
    config.py
    router.py
    server.py

_server.pyそして起動celery -A background._server worker

で関数KeyError: u'generic.adder'を呼び出そうとすると、Worker でが与えられますgeneric.adder.delay(..)

加算器関数:

ファイルgeneric.py

from background.server import app
from background.tasks.helpers import standardized_task

@standardized_task(app, name='generic.adder')
def adder(x, y):
    return x + y

..インスタンスを受け取り、Celery appTask の入出力を結果と関数を返す JSON オブジェクトに標準化する関数でラップされます。(以下に含まれます)しかし、問題は、このラッパー関数がgeneric.adderと同じファイルにある場合、問題なく動作することです-上記のようにインポートして使用すると、キーエラーがスローされます

ラッパーが関数名でname=..渡された属性を何らかの方法で変更しているため、タスクからアクセスしたときにのリテラル名が見つからないことが原因であると私は信じています。app.taskhelpers.pygeneric.adder

adder(..)また、内部_server.py(セロリ CLI から実行されるモジュール)から呼び出しようとすると、問題なく動作することに注意することも重要です。エラーがスローされるのは、分散インターフェイスを介して呼び出された場合のみです。つまり、インポートはセロリとは無関係に機能します。

ファイルhelpers.py

__author__ = 'Blake'

import types

JSON_TYPES = [
    dict, list, unicode, str, int, long, float, bool, types.NoneType
]

def standardized_task(app, *args, **kwargs):
    def wrapped_task(fn):
        def wrapped_fn(*fnargs, **fnkwargs):
            throws = fnkwargs.get('throws', Exception)
            raises = fnkwargs.get('raises', False)

            if not hasattr(throws, '__call__') and not isinstance(throws(), Exception):
                raise ValueError('throws value not of type Exception: %s' % type(throws))

            result, error = None, None

            try:
                result = fn(*fnargs, **fnkwargs)

                if type(result) not in JSON_TYPES:
                    result = unicode(result)

            except throws, e:
                error = e

                if raises:
                    raise
            finally:
                return {
                    'result': result,
                    'error': str(error) if error else None,
                    'meta': {
                        'args': fnargs, 'kwargs': fnkwargs
                    }
                }

        return app.task(wrapped_fn, *args, **kwargs)
    return wrapped_task

ファイル_server.py

from background.server import app
from background.tasks.generic import *
4

1 に答える 1

0

答えは、デコレータを使用するのではなく、celery.Task を抽象クラスに拡張して使用することです。@app.task(name='...', base=MyNewAbstractTask)

次の SO 投稿は、それをよりよく説明しています。

セロリタスクとデコレータのカスタマイズ

import types

JSON_TYPES = [
    dict, list, unicode, str, int, long, float, bool, types.NoneType
]

class StandardizedTask(Task):
    abstract = True

    def __call__(self, *args, **kwargs):
        return self.inner_run(*args, **kwargs)

    def inner_run(self, *args, **kwargs):
        throws = kwargs.get('throws', Exception)
        raises = kwargs.get('raises', False)

        if not hasattr(throws, '__call__') and not isinstance(throws(), Exception):
            raise ValueError('throws value not of type Exception: %s' % type(throws))

        result, error = None, None

        try:
            result = self.run(*args, **kwargs)

            if type(result) not in JSON_TYPES:
                result = unicode(result)

        except throws, e:
            error = e

            if raises:
                raise
        finally:
            return {
                'result': result,
                'error': str(error) if error else None,
                'meta': {
                    'args': args, 'kwargs': kwargs }}
于 2015-03-12T06:55:22.983 に答える