7

これをスタンドアロン アプリケーションとして実行することはできますが、Django で動作させるのに問題があります。

スタンドアロンのコードは次のとおりです。

from celery import Celery
from celery.schedules import crontab


app = Celery('tasks')
app.conf.update(
    CELERY_TASK_SERIALIZER='json',
    CELERY_RESULT_SERIALIZER='json',
    CELERY_ACCEPT_CONTENT=['json'],
    CELERY_TIMEZONE='US/Central',
    CELERY_ENABLE_UTC=True,
    CELERYBEAT_SCHEDULE = {
    'test': {
        'task': 'tasks.test',
        'schedule': crontab(),
        },
    }
)

@app.task
def test():
    with open('test.txt', 'a') as f:
        f.write('Hello, World!\n')`

Rabbitmq サーバーにフィードし、毎分ファイルに書き込みます。それは魅力のように機能しますが、Django で機能させようとすると、次のエラーが発生します。

このタスクを含むモジュールをインポートしたことを覚えていますか? それとも、相対インポートを使用していますか? 詳しくは____をご覧ください。

メッセージ本文の完全な内容: {'retries': 0, 'eta': None, 'kwargs': {}, 'taskset': None, 'timelimit': [None, None], 'callbacks': None , 'task': 'proj.test', 'args': [], 'expires': なし, 'id': '501ca998-b5eb-4ba4-98a8-afabda9e88dd', 'utc': True, 'errbacks':なし、「chord」: なし} (246b) トレースバック (最新の呼び出しが最後): ファイル "/home/user/CeleryDjango/venv/lib/python3.5/site-packages/celery/worker/consumer.py"、行456、on_task_received 戦略 [名前] (メッセージ、本文、KeyError: 'proj.test' [2016-06-16 01:16:00,051: INFO/Beat] スケジューラ: 期日タスク テスト (proj.test) の送信 [2016- 06-16 01:16:00,055: エラー/MainProcess] タイプ 'proj.test' の未登録のタスクを受け取りました。

そして、これはDjangoでの私のコードです:

# CELERY STUFF
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'US/Central'
CELERYBEAT_SCHEDULE = {
    'test': {
        'task': 'proj.test',
        'schedule': crontab(),
    }
}

セロリ.py

from __future__ import absolute_import

import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
from django.conf import settings  # noqa

app = Celery('proj')

app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

task.py

from __future__ import absolute_import
from celery import shared_task


@shared_task
def test():
    with open('test.txt', 'w') as f:
        print('Hello, World', file=f)

init.py

from __future__ import absolute_import

from .celery import app as celery_app 

これについてのご意見は大歓迎です。ありがとう。

4

1 に答える 1

14

次のように試してみて、うまくいったかどうか教えてください。それは私にとってはうまくいきます。

settings.py で

CELERYBEAT_SCHEDULE = {
    'my_scheduled_job': {
        'task': 'run_scheduled_jobs', # the same goes in the task name
        'schedule': crontab(),
    },
}

そして、tasks.py..

from celery.task import task # notice the import of task and not shared task. 

@task(name='run_scheduled_jobs') # task name found! celery will do its job
def run_scheduled_jobs():
    # do whatever stuff you do
    return True

ただし、shared_task を探している場合は..

@shared_task(name='my_shared_task') # name helps celery identify the functions it has to run
def my_shared_task():
    # do what you want here..
    return True

非同期ジョブに共有タスクを使用します..そのため、次のような関数から呼び出す必要があります..

プロジェクトアプリのviews.py /またはanywhere.py

def some_function():
    my_shared_task.apply_async(countdown= in_seconds)
    return True

忘れた場合に備えて、実行しようとしているアプリをタスクに含めることを忘れないでください..

INSTALLED_APPS = [...
 'my_app'...
] # include app

私はこのアプローチがうまくいくと確信しています..ありがとう

于 2016-06-16T06:04:33.643 に答える