Celery で長時間実行されているタスクに「一時停止」信号を送信したいのですが、それを行う最善の方法を見つけようとしています。ビューでは、データベースからオブジェクトのインスタンスを取得して保存するように指示できますが、Celery のオブジェクトのインスタンスとは異なります。オブジェクトは、一時停止されているかどうかを確認しません。
長時間実行されるクラスとタスク内からデータベースをポーリングするのは奇妙で実用的ではないように感じるので、インスタンスにメッセージを送信することを考えています。pubsub の使用を検討しましたが、既に Django プロジェクトであるため、Django シグナルを使用することをお勧めします。私はこれに間違った方法でアプローチしているかもしれません。
動作しない例を次に示します。
モデル.py
class LongRunningClass(models.Model):
is_paused = models.BooleanField(default=False)
processed_files = models.IntegerField(default=0)
total_files = models.IntegerField(default=100)
def long_task(self):
remaining_files = self.total_files - self.processed_files
for i in xrange(remaining_files):
if not self.is_paused:
self.processed_files += 1
time.sleep(1)
# Task complete, let's save.
self.save()
Views.py
def pause_task(self, pk):
lrc = LongRunningClass.objects.get(pk=pk)
lrc.is_paused = True
lrc.save()
return HttpResponse(json.dumps({'is_paused': lrc.is_paused}))
def resume_task(self, pk):
lrc = LongRunningClass.objects.get(pk=pk)
lrc.is_paused = False
lrc.save()
# Pretend this is a Celery task
lrc.long_task()
したがって、シグナルを使用するように models.py を変更すると、これらの行を追加できますが、それでもうまくいきません。
pause_signal = django.dispatch.Signal(providing_args=['is_paused'])
@django.dispatch.receiver(pause_signal)
def pause_callback(sender, **kwargs):
if 'is_paused' in kwargs:
sender.is_paused = kwargs['is_paused']
sender.save()
これは、すでに実行されているインスタンス化されたクラスにも影響しません。タスク内で実行中のモデルのインスタンスに一時停止を指示するにはどうすればよいですか?