14

プロジェクトで django-rq の使用を開始します。

Redis ベースの Python キューイング ライブラリである RQ との Django の統合。

RQ を使用している django アプリをテストするベスト プラクティスは何ですか?

たとえば、アプリをブラック ボックスとしてテストする場合、ユーザーがいくつかのアクションを実行した後、現在のキューですべてのジョブを実行し、DB ですべての結果を確認します。私のdjango-testsでそれを行うにはどうすればよいですか?

4

7 に答える 7

9

を見つけましdjango-rqた。これにより、テスト環境でワーカーをスピンアップし、キューでタスクを実行して終了することができます。

from django.test impor TestCase
from django_rq import get_worker

class MyTest(TestCase):
    def test_something_that_creates_jobs(self):
        ...                      # Stuff that init jobs.
        get_worker().work(burst=True)  # Processes all jobs then stop.
        ...                      # Asserts that the job stuff is done.
于 2012-09-05T02:22:58.537 に答える
4

rqテストをいくつかの部分に分けました。

  1. (モックを使用して) キューに正しく追加されていることをテストします。
  2. 何かがキューに追加された場合、それは最終的に処理されると仮定します。(rqのテスト スイートはこれをカバーする必要があります)。
  3. テスト、正しい入力が与えられた場合、私のタスクは期待どおりに機能します。(通常のコード テスト)。

テスト中のコード:

def handle(self, *args, **options):
    uid = options.get('user_id')

    # @@@ Need to exclude out users who have gotten an email within $window
    # days.
    if uid is None:
        uids = User.objects.filter(is_active=True, userprofile__waitlisted=False).values_list('id', flat=True)
    else:
        uids = [uid]

    q = rq.Queue(connection=redis.Redis())

    for user_id in uids:
        q.enqueue(mail_user, user_id)

私のテスト:

class DjangoMailUsersTest(DjangoTestCase):
    def setUp(self):
        self.cmd = MailUserCommand()

    @patch('redis.Redis')
    @patch('rq.Queue')
    def test_no_userid_queues_all_userids(self, queue, _):
        u1 = UserF.create(userprofile__waitlisted=False)
        u2 = UserF.create(userprofile__waitlisted=False)
        self.cmd.handle()
        self.assertItemsEqual(queue.return_value.enqueue.mock_calls,
                              [call(ANY, u1.pk), call(ANY, u2.pk)])

    @patch('redis.Redis')
    @patch('rq.Queue')
    def test_waitlisted_people_excluded(self, queue, _):
        u1 = UserF.create(userprofile__waitlisted=False)
        UserF.create(userprofile__waitlisted=True)
        self.cmd.handle()
        self.assertItemsEqual(queue.return_value.enqueue.mock_calls, [call(ANY, u1.pk)])
于 2013-01-26T19:07:31.807 に答える
1

次のことができるようにするパッチをコミットしました。

from django.test impor TestCase
from django_rq import get_queue

class MyTest(TestCase):
    def test_something_that_creates_jobs(self):
        queue = get_queue(async=False)
        queue.enqueue(func) # func will be executed right away
        # Test for job completion

これにより、RQ ジョブのテストが容易になります。それが役立つことを願っています!

于 2013-03-22T01:53:25.173 に答える
0

念のため、これは誰にとっても役立つでしょう。カスタム モック オブジェクトを含むパッチを使用して、すぐに実行されるエンキューを行いました

#patch django_rq.get_queue
with patch('django_rq.get_queue', return_value=MockBulkJobGetQueue()) as mock_django_rq_get_queue:
    #Perform web operation that starts job. In my case a post to a url

次に、モック オブジェクトにはメソッドが 1 つだけあります。

class MockBulkJobGetQueue(object):

    def enqueue(self, f, *args, **kwargs):
        # Call the function
        f(
            **kwargs.pop('kwargs', None)
        )
于 2016-07-31T18:31:00.097 に答える
0

キューにまだジョブがある間は、テストを一時停止する必要があります。これを行うには、 を確認Queue.is_empty()し、キューにまだジョブがある場合は実行を一時停止します。

import time
from django.utils.unittest import TestCase
import django_rq

class TestQueue(TestCase):

def test_something(self):
    # simulate some User actions which will queue up some tasks

    # Wait for the queued tasks to run
    queue = django_rq.get_queue('default')
    while not queue.is_empty():
        time.sleep(5) # adjust this depending on how long your tasks take to execute

    # queued tasks are done, check state of the DB
    self.assert(.....)
于 2012-08-07T12:07:43.320 に答える