3

タイミングタスクとして何らかの関数を使用したい

私のviews.py:

class SomeHandler:
    def __init__(self, user, instance):
        self.user = user
        self.instance = instance

    @task(filter=task_method)
    def send_some_msg(self, send_type):
        XXXXXXXXXXXXXXX

私のadmin.py:

    def somefunction(self, obj):
        user = self._user_filter(obj)
        name = obj.id
        task = "myapp.views.send_some_msg"
        crontab_time = {
            'month_of_year': obj.timing.month,
            'day_of_month': obj.timing.day,
            'hour': obj.timing.hour,
            'minute': obj.timing.minute
        }
        crontab = celery_models.CrontabSchedule.objects.create(**crontab_time)
        task = celery_models.PeriodicTask.objects.create(
            name=name, task=task, kwargs=json.dumps({'send_type': True}), enabled=True, crontab=crontab, expires=obj.timing + timedelta(days=1)

        )            

しかし、セロリワーカーでエラーが発生しました

TypeError: save_multiple_notification() missing 1 required positional argument: 'self'

そのタスクをどのように使用できますか?? 「タスク」文字列に価値をもたらすことはできません

4

0 に答える 0