129

Celery のドキュメントでは、Django 内での Celery のテストについて言及していますが、Django を使用していない場合に Celery タスクをテストする方法については説明していません。これどうやってやるの?

4

10 に答える 10

74

そこにあるユニットテストライブラリを使用して、タスクを同期的にテストすることができます。セロリのタスクを扱うときは、通常、2 つの異なるテスト セッションを行います。最初のもの (以下で提案しているように) は完全に同期的であり、アルゴリズムが本来すべきことを確実に実行するようにする必要があります。2 番目のセッションでは、システム全体 (ブローカーを含む) を使用し、シリアライゼーションの問題やその他の配布、通信の問題がないことを確認します。

そう:

from celery import Celery

celery = Celery()

@celery.task
def add(x, y):
    return x + y

そしてあなたのテスト:

from nose.tools import eq_

def test_add_task():
    rst = add.apply(args=(4, 4)).get()
    eq_(rst, 8)

それが役立つことを願っています!

于 2012-08-22T20:33:59.767 に答える
39

何をテストしたいかによって異なります。

  • タスク コードを直接テストします。「task.delay(...)」を呼び出さないでください。単体テストから「task(...)」を呼び出すだけです。
  • CELERY_ALWAYS_EAGERを使用します。これにより、「task.delay(...)」と言った時点でタスクがすぐに呼び出されるため、パス全体をテストできます (ただし、非同期動作はテストできません)。
于 2012-08-22T20:29:48.537 に答える
33

単体テスト

import unittest

from myproject.myapp import celeryapp

class TestMyCeleryWorker(unittest.TestCase):

  def setUp(self):
      celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)

py.test フィクスチャ

# conftest.py
from myproject.myapp import celeryapp

@pytest.fixture(scope='module')
def celery_app(request):
    celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)
    return celeryapp

# test_tasks.py
def test_some_task(celery_app):
    ...

補遺: send_task を熱心に尊重させる

from celery import current_app

def send_task(name, args=(), kwargs={}, **opts):
    # https://github.com/celery/celery/issues/581
    task = current_app.tasks[name]
    return task.apply(args, kwargs, **opts)

current_app.send_task = send_task
于 2016-03-04T22:10:47.687 に答える
31

Celery 4 の場合は次のとおりです。

@override_settings(CELERY_TASK_ALWAYS_EAGER=True)

設定名が変更されており、アップグレードする場合は更新する必要があるため、 を参照してください。

https://docs.celeryproject.org/en/latest/history/whatsnew-4.0.html?highlight=what%20is%20new#lowercase-setting-names

于 2016-12-20T18:49:26.303 に答える
17

Celery 3.0の時点でCELERY_ALWAYS_EAGERDjangoで設定する 1 つの方法は次のとおりです。

from django.test import TestCase, override_settings

from .foo import foo_celery_task

class MyTest(TestCase):

    @override_settings(CELERY_ALWAYS_EAGER=True)
    def test_foo(self):
        self.assertTrue(foo_celery_task.delay())
于 2015-10-27T13:30:32.983 に答える