0

このソリューションを使用しようとしましたが、成功しませんでした。タスクを実行する日を指定するにはどうすればよいですか? 私の解決策は

class DayOfMonth(schedule):

    def __init__(self, day=1):
        self.day = day

    def is_due(self, last_run_at):
        now = datetime.datetime.now()
        if now.month != last_run_at.month and now.day == self.day:
            return True, 3000
        return False, 3000

    def __eq__(self, other):
            if isinstance(other, DayOfMonth):
                return self.day == other.day and self.month == other.month
            return False

django-celery で実行しようとしましたが、run_every が指定されていないというエラーが表示されます。

編集1:

以下を追加してタスクを実行します。

"my_task": {
        "task": "util.tasks.CeleryManagementCommand",
        "schedule": DayOfMonth(day=4),
        "args": ('my_task',),
    },

CELERYBEAT_SHEDULE口述する

編集2:

initで run_every を指定すると-> self.run_every = NoneNone 型オブジェクトに属性 total_seconds がないというエラーが表示される

4

1 に答える 1

1

サブクラス化してinitを変更する場合は、必ず親のinitを呼び出すようにしてください。これで問題が解決すると確信しています:

class DayOfMonth(schedule):

    def __init__(self, day=1, *args, **kwargs):
        super(DayOfMonth, self).__init__(*args, **kwargs)
        self.day = day

これを確認すると、発生したエラーが明らかになります: https://github.com/ask/celery/blob/master/celery/schedules.py#L33

于 2012-04-06T16:38:10.187 に答える