私はCeleryを初めて使用し、これは(1)他の場所で回答されている可能性がある(そうであれば、回答が見つからなかった)、または(2)目的を達成するためのより良い方法があるかもしれないという事実で私の質問を前置きします私が直接求めているものよりも。
また、私は を認識していますが、探しているものを完全には達成していませんcelery.contrib.methods
。task_method
私の目的
クラス全体を Celery タスクに変えるクラス mixin を作成したいと思います。たとえば、以下のコードのようなもので表される mixin (現在は実行されません):
from celery import Task
class ClassTaskMixin(Task):
@classmethod
def enqueue(cls, *args, **kwargs):
cls.delay(*args, **kwargs)
def run(self, *args, **kwargs):
Obj = type(self.name, (), {})
Obj(*args, **kwargs).run_now()
def run_now(self):
raise NotImplementedError()
を使用する場合とは異なり、タスクがキューに入れられて呼び出される前に、クラスを完全にインスタンス化したくtask_method
ありません。.delay()
むしろ、関連する初期化パラメーターとともにクラス名を非同期プロセスに渡すだけです。次に、非同期プロセスは、クラス名と指定された初期化パラメーターを使用してクラスを完全にインスタンス化.run_now()
し、インスタンス化されたオブジェクトで何らかのメソッド (たとえば、) を呼び出します。
ユースケースの例
私が必要とする mixin の使用例としては、電子メールの作成と非同期的な送信があります。
class WelcomeEmail(EmailBase, ClassTaskMixin):
def __init__(self, recipient_address, template_name, template_context):
self.recipient_address = recipient_address
self.template_name = template_name
self.template_context = template_context
def send(self):
self.render_templates()
self.construct_mime()
self.archive_to_db()
self.send_smtp_email()
def run_now(self):
self.send()
上記のコードは、 を呼び出すことにより、非同期 Celery プロセスで電子メールを送信しますWelcomeEmail.enqueue(recipient_address, template_name, template_context)
。電子メールをプロセス内で同期的に送信するには、 を呼び出しWelcomeEmail(recipient_address, template_name, template_context).send()
ます。
質問
- Celeryフレームワーク内で私がやろうとしていることは非常に間違っているという理由はありますか?
- ミックスインを構成して、私が提案したものよりも Celery-onic にするためのより良い方法はありますか (より良い属性名、異なるメソッド構造など)?
- 私が説明したように、ユースケースでミックスインを機能させるために何が欠けていますか?