テストベッドを使用してGoogleAppEngineアプリの単体テストを行っていますが、アプリはタスクキューを使用しています。
単体テスト中にタスクをタスクキューに送信すると、タスクはキューにあるように見えますが、タスクは実行されません。
単体テスト中にタスクを実行するにはどうすればよいですか?
テストベッドを使用してGoogleAppEngineアプリの単体テストを行っていますが、アプリはタスクキューを使用しています。
単体テスト中にタスクをタスクキューに送信すると、タスクはキューにあるように見えますが、タスクは実行されません。
単体テスト中にタスクを実行するにはどうすればよいですか?
Saxonの優れた答えを使用して、gaetestbedの代わりにtestbedを使用して同じことを行うことができました。これが私がしたことです。
私にこれを追加しましたsetUp()
:
self.taskqueue_stub = apiproxy_stub_map.apiproxy.GetStub('taskqueue')
次に、私のテストでは、次のものを使用しました。
# Execute the task in the taskqueue
tasks = self.taskqueue_stub.GetTasks("default")
self.assertEqual(len(tasks), 1)
task = tasks[0]
params = base64.b64decode(task["body"])
response = self.app.post(task["url"], params)
どこかで、POSTパラメータはbase64でエンコードされるため、動作させるにはそれを元に戻す必要がありました。
公式のテストベッドパッケージを使用でき、すべて自分のテストコード内で実行できるため、Saxonの回答よりもこれが好きです。
編集:後で、延期されたライブラリを使用して送信されたタスクで同じことをしたいと思いました。それを理解するのに少し頭を悩ませたので、他の人の苦痛を和らげるためにここで共有しています。
タスクキューに遅延で送信されたタスクのみが含まれている場合、これにより、すべてのタスクと、それらのタスクによってキューに入れられたすべてのタスクが実行されます。
def submit_deferred(taskq):
tasks = taskq.GetTasks("default")
taskq.FlushQueue("default")
while tasks:
for task in tasks:
(func, args, opts) = pickle.loads(base64.b64decode(task["body"]))
func(*args)
tasks = taskq.GetTasks("default")
taskq.FlushQueue("default")
これを実現するためのもう1つの(よりクリーンな)オプションは、テストベッド内でタスクキュースタブを使用することです。setUp()
これを行うには、最初にメソッドに以下を追加してタスクキュースタブを初期化する必要があります。
self.testbed = init_testbed()
self.testbed.init_taskqueue_stub()
タスクスケジューラには、次のコードを使用してアクセスできます。
taskq = self.testbed.get_stub(testbed.TASKQUEUE_SERVICE_NAME)
キュースタブを操作するためのインターフェイスは次のとおりです。
GetQueues() #returns a list of dictionaries with information about the available queues
#returns a list of dictionaries with information about the tasks in a given queue
GetTasks(queue_name)
DeleteTask(queue_name, task_name) #removes the task task_name from the given queue
FlushQueue(queue_name) #removes all the tasks from the queue
#returns tasks filtered by name & url pointed to by the task from the given queues
get_filtered_tasks(url, name, queue_names)
StartBackgroundExecution() #Executes the queued tasks
Shutdown() #Requests the task scheduler to shutdown.
また、これはApp Engine SDK独自の機能を使用するため、遅延ライブラリでも問題なく機能します。
開発アプリサーバーはシングルスレッドであるため、フォアグラウンドスレッドがテストを実行している間はバックグラウンドでタスクを実行できません。
gaetestbedのtaskqueue.pyのTaskQueueTestCaseを変更して、次の関数を追加しました。
def execute_tasks(self, application):
"""
Executes all currently queued tasks, and also removes them from the
queue.
The tasks are execute against the provided web application.
"""
# Set up the application for webtest to use (resetting _app in case a
# different one has been used before).
self._app = None
self.APPLICATION = application
# Get all of the tasks, and then clear them.
tasks = self.get_tasks()
self.clear_task_queue()
# Run each of the tasks, checking that they succeeded.
for task in tasks:
response = self.post(task['url'], task['params'])
self.assertOK(response)
これを機能させるには、TaskQueueTestCaseの基本クラスをBaseTestCaseからWebTestCaseに変更する必要もありました。
次に、私のテストは次のようなことを行います。
# Do something which enqueues a task.
# Check that a task was enqueued, then execute it.
self.assertTrue(len(self.get_tasks()), 1)
self.execute_tasks(some_module.application)
# Now test that the task did what was expected.
したがって、これはフォアグラウンドユニットテストから直接タスクを実行します。これは本番環境とはまったく同じではありませんが(つまり、タスクは「しばらくして」別のリクエストで実行されます)、私にとっては十分に機能します。
次のコードを試してみてください。完全な説明はここにあります:http://www.geewax.org/task-queue-support-in-app-engines-ext-testbed/
import unittest
from google.appengine.api import taskqueue
from google.appengine.ext import testbed
class TaskQueueTestCase(unittest.TestCase):
def setUp(self):
self.testbed = testbed.Testbed()
self.testbed.activate()
self.testbed.init_taskqueue_stub()
self.task_queue_stub = self.testbed.get_stub(testbed.TASKQUEUE_SERVICE_NAME)
def tearDown(self):
self.testbed.deactivate()
def testTaskAdded(self):
taskqueue.add(url='/path/to/task')
tasks = self.taskqueue_stub.get_filtered_tasks(url='/path/to/task')
self.assertEqual(1, len(tasks))
self.assertEqual('/path/to/task', tasks[0].url)
unittest.main()