15

以前に実行されたタスクがわからない場合、タスクの結果を取得するにはどうすればよいですか? セットアップは次のとおりです。次のソース(「tasks.py」)が与えられます:

from celery import Celery

app = Celery('tasks', backend="db+mysql://u:p@localhost/db", broker = 'amqp://guest:guest@localhost:5672//')

@app.task
def add(x,y):
   return x + y


@app.task
def mul(x,y):
   return x * y

RabbitMQ 3.3.2 をローカルで実行している場合:

marcs-mbp:sbin marcstreeter$ ./rabbitmq-server

              RabbitMQ 3.3.2. Copyright (C) 2007-2014 GoPivotal, Inc.
  ##  ##      Licensed under the MPL.  See http://www.rabbitmq.com/
  ##  ##
  ##########  Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log
  ######  ##        /usr/local/var/log/rabbitmq/rabbit@localhost-sasl.log
  ##########
              Starting broker... completed with 10 plugins.

Celery 3.1.12 がローカルで実行されている場合:

 -------------- celery@Marcs-MacBook-Pro.local v3.1.12 (Cipater)
---- **** -----
--- * ***  * -- Darwin-13.2.0-x86_64-i386-64bit
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x105dea3d0
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     disabled
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery

次に、メソッドをインポートして、「task_id」で結果を取得できます。

from tasks import add, mul
from celery.result import AsyncResult

result = add.delay(2,2)
task_id = result.task_id
result.get() # 4

result = AsyncResult(id=task_id)
result.get() # 4

result = add.AsyncResult(id=task_id)
result.get() # 4

# and the same for the 'mul' task. Just imagine I put it here

次の例では、これらのステップをプロセス間で分割します。あるプロセスでは、次のように「task_id」を取得します。

from tasks import add

result = add.delay(5,5)
task_id = result.task_id

そして、別のプロセスで同じ 'task_id' を使用する場合 (コピーして別の REPL に貼り付けるか、別の HTTP 要求で)、次のようにします。

from celery.result import AsyncResult

result = AsyncResult(id="copied_task_id", backend="db+mysql://u:p@localhost/db")
result.get() # AttributeError: 'str' object has no attribute 'get_task_meta'
result.state # AttributeError: 'str' object has no attribute 'get_task_meta'
result.status # AttributeError: 'str' object has no attribute 'get_task_meta'

そして、私がそうする場合、別のプロセスで:

from task import add # in this instance I know that an add task was performed

result = add.AsyncResult(id="copied_task_id")
result.status # "SUCCESSFUL"
result.state # "SUCCESSFUL"
result.get() # 10

どのタスクが結果を生成しているかを事前に知らなくても、結果を取得できるようにしたいと考えています。私の実際の環境では、この task_id をクライアントに返し、HTTP リクエストを介してジョブのステータスを照会できるようにする予定です。

4

2 に答える 2