84

私は、 django-celeryプロジェクトのテスト方法を考え出そうとしています。ドキュメントの注記を読みましたが、実際に何をすべきかについての良いアイデアが得られませんでした。実際のデーモンでタスクをテストすることについては心配していません。コードの機能だけです。主に私は疑問に思っています:

  1. テスト中にバイパスするにはどうすればよいですかtask.delay()(設定を試みCELERY_ALWAYS_EAGER = Trueましたが、違いはありませんでした)。
  2. 実際に settings.py を変更せずに、推奨されるテスト設定をどのように使用するのですか (それが最善の方法である場合)。
  3. manage.py testカスタムランナーを引き続き使用できますか、それとも使用する必要がありますか?

全体として、セロリでテストするためのヒントやヒントは非常に役立ちます.

4

6 に答える 6

73

セロリの結果を完了する必要があるテストでは、override_settings デコレーターを使用するのが好きです。

from django.test import TestCase
from django.test.utils import override_settings
from myapp.tasks import mytask

class AddTestCase(TestCase):

    @override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
                       CELERY_ALWAYS_EAGER=True,
                       BROKER_BACKEND='memory')
    def test_mytask(self):
        result = mytask.delay()
        self.assertTrue(result.successful())

これをすべてのテストに適用する場合は、http://docs.celeryproject.org/en/2.5/django/unit-testing.htmlで説明されているように、セロリ テスト ランナーを使用できますBROKER_BACKEND = 'memory'

設定:

TEST_RUNNER = 'djcelery.contrib.test_runner.CeleryTestSuiteRunner'

CeleryTestSuiteRunner のソースを見ると、何が起きているかがよくわかります。

于 2012-12-19T05:08:11.037 に答える
46

設定してみてください:

BROKER_BACKEND = 'memory'

asksolのコメントに感謝します。)

于 2011-05-02T23:19:53.987 に答える
18

apply_asyncこれは、メソッドをスタブ化し、それへの呼び出しを記録する私のテスト基本クラスからの抜粋です(これにはTask.delay.

from django.test import TestCase
from celery.task.base import Task
# For recent versions, Task has been moved to celery.task.app:
# from celery.app.task import Task
# See http://docs.celeryproject.org/en/latest/reference/celery.app.task.html

class CeleryTestCaseBase(TestCase):

    def setUp(self):
        super(CeleryTestCaseBase, self).setUp()
        self.applied_tasks = []

        self.task_apply_async_orig = Task.apply_async

        @classmethod
        def new_apply_async(task_class, args=None, kwargs=None, **options):
            self.handle_apply_async(task_class, args, kwargs, **options)

        # monkey patch the regular apply_sync with our method
        Task.apply_async = new_apply_async

    def tearDown(self):
        super(CeleryTestCaseBase, self).tearDown()

        # Reset the monkey patch to the original method
        Task.apply_async = self.task_apply_async_orig

    def handle_apply_async(self, task_class, args=None, kwargs=None, **options):
        self.applied_tasks.append((task_class, tuple(args), kwargs))

    def assert_task_sent(self, task_class, *args, **kwargs):
        was_sent = any(task_class == task[0] and args == task[1] and kwargs == task[2]
                       for task in self.applied_tasks)
        self.assertTrue(was_sent, 'Task not called w/class %s and args %s' % (task_class, args))

    def assert_task_not_sent(self, task_class):
        was_sent = any(task_class == task[0] for task in self.applied_tasks)
        self.assertFalse(was_sent, 'Task was not expected to be called, but was.  Applied tasks: %s' %                 self.applied_tasks)

これは、テスト ケースでどのように使用するかを示す "頭から離れた" 例です。

mymodule.py

from my_tasks import SomeTask

def run_some_task(should_run):
    if should_run:
        SomeTask.delay(1, some_kwarg=2)

test_mymodule.py

class RunSomeTaskTest(CeleryTestCaseBase):
    def test_should_run(self):
        run_some_task(should_run=True)
        self.assert_task_sent(SomeTask, 1, some_kwarg=2)

    def test_should_not_run(self):
        run_some_task(should_run=False)
        self.assert_task_not_sent(SomeTask)
于 2010-12-30T02:52:08.623 に答える
4

これはまだ検索結果に表示されるため、設定は次のように上書きされます

TEST_RUNNER = 'djcelery.contrib.test_runner.CeleryTestSuiteRunner'

Celery Docsに従って私のために働いた

于 2014-10-23T00:20:56.317 に答える