django-nose を使用して、セロリ タスクの単体テストを作成しています。それはかなり典型的です。テスト時にフィクスチャを介して事前入力される空のテスト データベース (REUSE_DB=0)。
私が抱えている問題は、TestCase がフィクスチャをロードしていて、テスト メソッドからオブジェクトにアクセスできるにもかかわらず、非同期セロリ タスク内で実行すると同じクエリが失敗することです。
settings.DATABASES["default"]["name"] がテストメソッドとテスト中のタスクの両方で同じであることを確認しました。また、テスト中のタスクが通常のメソッド呼び出しとして呼び出されたときに正しく動作することも検証しました。
そして、それは私がアイデアを失っているところです。
サンプルは次のとおりです。
class MyTest(TestCase):
fixtures = ['test_data.json']
def setUp(self):
settings.CELERY_ALWAYS_EAGER = True # seems to be required; if not I get socket errors for Rabbit
settings.CELERY_EAGER_PROPAGATES_EXCEPTIONS = True # exposes errors in the code under test.
def test_city(self):
self.assertIsNotNone(City.objects.get(name='brisbane'))
myTask.delay(city_name='brisbane').get()
# The following works fine: myTask('brisbane')
from celery.task import task
@task()
def myTask(city_name):
c = City.objects.count() # gives 0
my_city = City.objects.get(name=city_name) # raises DoesNotExist exception
return