Celery のドキュメントでは、Django 内での Celery のテストについて言及していますが、Django を使用していない場合に Celery タスクをテストする方法については説明していません。これどうやってやるの?
質問する
66585 次
10 に答える
74
そこにあるユニットテストライブラリを使用して、タスクを同期的にテストすることができます。セロリのタスクを扱うときは、通常、2 つの異なるテスト セッションを行います。最初のもの (以下で提案しているように) は完全に同期的であり、アルゴリズムが本来すべきことを確実に実行するようにする必要があります。2 番目のセッションでは、システム全体 (ブローカーを含む) を使用し、シリアライゼーションの問題やその他の配布、通信の問題がないことを確認します。
そう:
from celery import Celery
celery = Celery()
@celery.task
def add(x, y):
return x + y
そしてあなたのテスト:
from nose.tools import eq_
def test_add_task():
rst = add.apply(args=(4, 4)).get()
eq_(rst, 8)
それが役立つことを願っています!
于 2012-08-22T20:33:59.767 に答える
39
何をテストしたいかによって異なります。
- タスク コードを直接テストします。「task.delay(...)」を呼び出さないでください。単体テストから「task(...)」を呼び出すだけです。
- CELERY_ALWAYS_EAGERを使用します。これにより、「task.delay(...)」と言った時点でタスクがすぐに呼び出されるため、パス全体をテストできます (ただし、非同期動作はテストできません)。
于 2012-08-22T20:29:48.537 に答える
33
単体テスト
import unittest
from myproject.myapp import celeryapp
class TestMyCeleryWorker(unittest.TestCase):
def setUp(self):
celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)
py.test フィクスチャ
# conftest.py
from myproject.myapp import celeryapp
@pytest.fixture(scope='module')
def celery_app(request):
celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)
return celeryapp
# test_tasks.py
def test_some_task(celery_app):
...
補遺: send_task を熱心に尊重させる
from celery import current_app
def send_task(name, args=(), kwargs={}, **opts):
# https://github.com/celery/celery/issues/581
task = current_app.tasks[name]
return task.apply(args, kwargs, **opts)
current_app.send_task = send_task
于 2016-03-04T22:10:47.687 に答える
31
Celery 4 の場合は次のとおりです。
@override_settings(CELERY_TASK_ALWAYS_EAGER=True)
設定名が変更されており、アップグレードする場合は更新する必要があるため、 を参照してください。
于 2016-12-20T18:49:26.303 に答える
17
Celery 3.0の時点でCELERY_ALWAYS_EAGER
、Djangoで設定する 1 つの方法は次のとおりです。
from django.test import TestCase, override_settings
from .foo import foo_celery_task
class MyTest(TestCase):
@override_settings(CELERY_ALWAYS_EAGER=True)
def test_foo(self):
self.assertTrue(foo_celery_task.delay())
于 2015-10-27T13:30:32.983 に答える