タイミングタスクとして何らかの関数を使用したい
私のviews.py:
class SomeHandler:
def __init__(self, user, instance):
self.user = user
self.instance = instance
@task(filter=task_method)
def send_some_msg(self, send_type):
XXXXXXXXXXXXXXX
私のadmin.py:
def somefunction(self, obj):
user = self._user_filter(obj)
name = obj.id
task = "myapp.views.send_some_msg"
crontab_time = {
'month_of_year': obj.timing.month,
'day_of_month': obj.timing.day,
'hour': obj.timing.hour,
'minute': obj.timing.minute
}
crontab = celery_models.CrontabSchedule.objects.create(**crontab_time)
task = celery_models.PeriodicTask.objects.create(
name=name, task=task, kwargs=json.dumps({'send_type': True}), enabled=True, crontab=crontab, expires=obj.timing + timedelta(days=1)
)
しかし、セロリワーカーでエラーが発生しました
TypeError: save_multiple_notification() missing 1 required positional argument: 'self'
そのタスクをどのように使用できますか?? 「タスク」文字列に価値をもたらすことはできません