54

次のように定義された関数がある場合:

def add(x,y):
  return x+y

この関数をセロリPeriodicTaskとして動的に追加し、実行時に開始する方法はありますか?(擬似コード)のようなことができるようにしたいと思います:

some_unique_task_id = celery.beat.schedule_task(add, run_every=crontab(minute="*/30"))
celery.beat.start(some_unique_task_id)

また、(擬似コード)のようなものを使用して、そのタスクを動的に停止または削除したいと思います。

celery.beat.remove_task(some_unique_task_id)

また

celery.beat.stop(some_unique_task_id)

参考までに、私はdjceleryを使用していません。djceleryを使用すると、django管理者を介して定期的なタスクを管理できます。

4

8 に答える 8

41

この質問はグーグルグループで答えられました。

私は著者ではありません、すべてのクレジットはジャン・マークに行きます

これに対する適切な解決策は次のとおりです。動作を確認しました。私のシナリオでは、定期タスクをサブクラス化し、必要に応じてモデルに他のフィールドを追加できるため、また「終了」メソッドを追加できるため、モデルを作成しました。定期的なタスクのenabledプロパティをFalseに設定し、削除する前に保存する必要があります。サブクラス化全体は必須ではありません。schedule_everyメソッドが実際に機能します。タスクを終了する準備ができたら(サブクラス化していない場合)、PeriodicTask.objects.filter(name = ...)を使用してタスクを検索し、無効にしてから削除します。

お役に立てれば!

from djcelery.models import PeriodicTask, IntervalSchedule
from datetime import datetime

class TaskScheduler(models.Model):

    periodic_task = models.ForeignKey(PeriodicTask)

    @staticmethod
    def schedule_every(task_name, period, every, args=None, kwargs=None):
    """ schedules a task by name every "every" "period". So an example call would be:
         TaskScheduler('mycustomtask', 'seconds', 30, [1,2,3]) 
         that would schedule your custom task to run every 30 seconds with the arguments 1,2 and 3 passed to the actual task. 
    """
        permissible_periods = ['days', 'hours', 'minutes', 'seconds']
        if period not in permissible_periods:
            raise Exception('Invalid period specified')
        # create the periodic task and the interval
        ptask_name = "%s_%s" % (task_name, datetime.datetime.now()) # create some name for the period task
        interval_schedules = IntervalSchedule.objects.filter(period=period, every=every)
        if interval_schedules: # just check if interval schedules exist like that already and reuse em
            interval_schedule = interval_schedules[0]
        else: # create a brand new interval schedule
            interval_schedule = IntervalSchedule()
            interval_schedule.every = every # should check to make sure this is a positive int
            interval_schedule.period = period 
            interval_schedule.save()
        ptask = PeriodicTask(name=ptask_name, task=task_name, interval=interval_schedule)
        if args:
            ptask.args = args
        if kwargs:
            ptask.kwargs = kwargs
        ptask.save()
        return TaskScheduler.objects.create(periodic_task=ptask)

    def stop(self):
        """pauses the task"""
        ptask = self.periodic_task
        ptask.enabled = False
        ptask.save()

    def start(self):
        """starts the task"""
        ptask = self.periodic_task
        ptask.enabled = True
        ptask.save()

    def terminate(self):
        self.stop()
        ptask = self.periodic_task
        self.delete()
        ptask.delete()
于 2013-07-08T10:55:52.077 に答える
25

これは、celeryv4.1.0に含まれている修正によって最終的に可能になりました。これで、データベースバックエンドのスケジュールエントリを変更するだけで、celery-beatは新しいスケジュールに従って動作します。

ドキュメントでは、これがどのように機能するかを漠然と説明しています。celery-beatのデフォルトのスケジューラーは、スケジュールデータベースとしてshelveファイルPersistentSchedulerを使用します。インスタンス内のディクショナリへの変更はすべてこのデータベースと同期され(デフォルトでは3分ごと)、その逆も同様です。ドキュメントでは、usingに新しいエントリを追加する方法について説明しています。既存のエントリを変更するには、同じで新しいエントリを追加するだけです。辞書から行うのと同じようにエントリを削除します。beat_schedulePersistentSchedulerbeat_scheduleapp.add_periodic_tasknamedel app.conf.beat_schedule['name']

外部アプリを使用してセロリのビートスケジュールを監視および変更するとします。次に、いくつかのオプションがあります。

  1. openデータベースファイルを棚上げして、辞書のようにその内容を読み取ることができます。変更のためにこのファイルに書き戻します。
  2. Celeryアプリの別のインスタンスを実行し、そのインスタンスを使用して、上記のようにシェルフファイルを変更できます。
  3. django-celery-beatのカスタムスケジューラクラスを使用して、スケジュールをdjangoが管理するデータベースに保存し、そこのエントリにアクセスできます。
  4. celerybeat-mongoのスケジューラーを使用して、スケジュールをMongoDBバックエンドに保存し、そこのエントリーにアクセスできます。
于 2019-04-10T00:36:19.337 に答える
21

いいえ、申し訳ありませんが、これは通常のセロリビートでは不可能です。

ただし、必要な操作を簡単に拡張できます。たとえば、django-celeryスケジューラーは、スケジュールをデータベースに読み書きするサブクラスです(いくつかの最適化が上にあります)。

また、Django以外のプロジェクトでもdjango-celeryスケジューラーを使用できます。

このようなもの:

  • django+django-celeryをインストールします。

    $ pip install -Udjangodjango-セロリ

  • celeryconfigに次の設定を追加します。

    DATABASES = {
        'default': {
            'NAME': 'celerybeat.db',
            'ENGINE': 'django.db.backends.sqlite3',
        },
    }
    INSTALLED_APPS = ('djcelery', )
    
  • データベーステーブルを作成します。

    $ PYTHONPATH=. django-admin.py syncdb --settings=celeryconfig
    
  • データベーススケジューラを使用してcelerybeatを起動します。

    $ PYTHONPATH=. django-admin.py celerybeat --settings=celeryconfig \
        -S djcelery.schedulers.DatabaseScheduler
    

また、djcelerymonDjango以外のプロジェクトで同じプロセスでcelerycamとDjango Admin Webサーバーを起動するために使用できるコマンドがあります。これを使用して、定期的なタスクを素敵なWebインターフェイスで編集することもできます。

   $ djcelerymon

(何らかの理由で、Ctrl +Cを使用してdjcelerymonを停止できないことに注意してください。Ctrl+Z+ kill%1を使用する必要があります)

于 2012-04-19T10:00:53.870 に答える
8

必要なモデルを提供するdjango-celery-beatというライブラリがあります。新しい定期的なタスクを動的にロードするには、独自のスケジューラを作成する必要があります。

from django_celery_beat.schedulers import DatabaseScheduler


class AutoUpdateScheduler(DatabaseScheduler):

    def tick(self, *args, **kwargs):
        if self.schedule_changed():
            print('resetting heap')
            self.sync()
            self._heap = None
            new_schedule = self.all_as_schedule()

            if new_schedule:
                to_add = new_schedule.keys() - self.schedule.keys()
                to_remove = self.schedule.keys() - new_schedule.keys()
                for key in to_add:
                    self.schedule[key] = new_schedule[key]
                for key in to_remove:
                    del self.schedule[key]

        super(AutoUpdateScheduler, self).tick(*args, **kwargs)

    @property
    def schedule(self):
        if not self._initial_read and not self._schedule:
            self._initial_read = True
            self._schedule = self.all_as_schedule()

        return self._schedule
于 2017-05-03T06:15:58.333 に答える
3

フラスコとdjceleryを構成し、参照可能なRESTAPIを提供するこのflask-djceleryを確認できます。

于 2016-11-16T20:41:37.727 に答える
3

@asksolからの答えは、Djangoアプリケーションの場合に必要なものです。

非djangoアプリケーションの場合、ファイルの代わりにデータベースも使用するため、Djangoのdjango-celery-beatのcelery-sqlalchemy-schedulerようにモデル化されたものを使用できます。celerybeat-schedule

これは、新しいタスクを実行時に追加する例です。

tasks.py

from celery import Celery

celery = Celery('tasks')

beat_dburi = 'sqlite:///schedule.db'

celery.conf.update(
    {'beat_dburi': beat_dburi}
)


@celery.task
def my_task(arg1, arg2, be_careful):
    print(f"{arg1} {arg2} be_careful {be_careful}")

ログ(プロデューサー)

$ celery --app=tasks beat --scheduler=celery_sqlalchemy_scheduler.schedulers:DatabaseScheduler --loglevel=INFO
celery beat v5.1.2 (sun-harmonics) is starting.
[2021-08-20 15:20:20,927: INFO/MainProcess] beat: Starting...

ログ(消費者)

$ celery --app=tasks worker --queues=celery --loglevel=INFO
-------------- celery@ubuntu20 v5.1.2 (sun-harmonics)
[2021-08-20 15:20:02,287: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//

データベースのスケジュール

$ sqlite3 schedule.db 
sqlite> .databases
main: /home/nponcian/Documents/Program/1/db/schedule.db
sqlite> .tables
celery_crontab_schedule       celery_periodic_task_changed
celery_interval_schedule      celery_solar_schedule       
celery_periodic_task        
sqlite> select * from celery_periodic_task;
1|celery.backend_cleanup|celery.backend_cleanup||1||[]|{}|||||2021-08-20 19:20:20.955246|0||1||0|2021-08-20 07:20:20|

ここで、これらのワーカーが既に実行されている間に、新しいスケジュールされたタスクを追加してスケジュールを更新しましょう。これは実行時に実行され、ワー​​カーを再起動する必要がないことに注意してください。

$ python3
>>> # Setup the session.
>>> from celery_sqlalchemy_scheduler.models import PeriodicTask, IntervalSchedule
>>> from celery_sqlalchemy_scheduler.session import SessionManager
>>> from tasks import beat_dburi
>>> session_manager = SessionManager()
>>> engine, Session = session_manager.create_session(beat_dburi)
>>> session = Session()
>>> 
>>> # Setup the schedule (executes every 10 seconds).
>>> schedule = session.query(IntervalSchedule).filter_by(every=10, period=IntervalSchedule.SECONDS).first()
>>> if not schedule:
...     schedule = IntervalSchedule(every=10, period=IntervalSchedule.SECONDS)
...     session.add(schedule)
...     session.commit()
... 
>>> 
>>> # Create the periodic task
>>> import json
>>> periodic_task = PeriodicTask(
...     interval=schedule,                  # we created this above.
...     name='My task',                     # simply describes this periodic task.
...     task='tasks.my_task',               # name of task.
...     args=json.dumps(['arg1', 'arg2']),
...     kwargs=json.dumps({
...        'be_careful': True,
...     }),
... )
>>> session.add(periodic_task)
>>> session.commit()

データベーススケジュール(更新)

  • これで、新しく追加されたスケジュールが、セロリビートスケジューラによって継続的に読み取られるデータベースに反映されていることがわかります。したがって、argsまたはkwargsの値で更新があった場合、データベースでSQL更新を簡単に実行でき、実行中のワーカーでリアルタイムに反映する必要があります(再起動する必要はありません)。
sqlite> select * from celery_periodic_task;
1|celery.backend_cleanup|celery.backend_cleanup||1||[]|{}|||||2021-08-20 19:20:20.955246|0||1||0|2021-08-20 07:20:20|
2|My task|tasks.my_task|1|||["arg1", "arg2"]|{"be_careful": true}||||||0||1||0|2021-08-20 07:26:49|

ログ(プロデューサー)

  • 現在、新しいタスクは10秒ごとにキューに入れられています
[2021-08-20 15:26:51,768: INFO/MainProcess] DatabaseScheduler: Schedule changed.
[2021-08-20 15:26:51,768: INFO/MainProcess] Writing entries...
[2021-08-20 15:27:01,789: INFO/MainProcess] Scheduler: Sending due task My task (tasks.my_task)
[2021-08-20 15:27:11,776: INFO/MainProcess] Scheduler: Sending due task My task (tasks.my_task)
[2021-08-20 15:27:21,791: INFO/MainProcess] Scheduler: Sending due task My task (tasks.my_task)

ログ(消費者)

  • 新しく追加されたタスクは、10秒ごとに時間どおりに正しく実行されます
[2021-08-20 15:27:01,797: INFO/MainProcess] Task tasks.my_task[04dcb40c-0a77-437b-a129-57eb52850a51] received
[2021-08-20 15:27:01,798: WARNING/ForkPoolWorker-4] arg1 arg2 be_careful True
[2021-08-20 15:27:01,799: WARNING/ForkPoolWorker-4] 

[2021-08-20 15:27:01,799: INFO/ForkPoolWorker-4] Task tasks.my_task[04dcb40c-0a77-437b-a129-57eb52850a51] succeeded in 0.000763321000704309s: None
[2021-08-20 15:27:11,783: INFO/MainProcess] Task tasks.my_task[e8370a6b-085f-4bd5-b7ad-8f85f4b61908] received
[2021-08-20 15:27:11,786: WARNING/ForkPoolWorker-4] arg1 arg2 be_careful True
[2021-08-20 15:27:11,786: WARNING/ForkPoolWorker-4] 

[2021-08-20 15:27:11,787: INFO/ForkPoolWorker-4] Task tasks.my_task[e8370a6b-085f-4bd5-b7ad-8f85f4b61908] succeeded in 0.0006725780003762338s: None
[2021-08-20 15:27:21,797: INFO/MainProcess] Task tasks.my_task[c14d875d-7f6c-45c2-a76b-4e9483273185] received
[2021-08-20 15:27:21,799: WARNING/ForkPoolWorker-4] arg1 arg2 be_careful True
[2021-08-20 15:27:21,799: WARNING/ForkPoolWorker-4] 

[2021-08-20 15:27:21,800: INFO/ForkPoolWorker-4] Task tasks.my_task[c14d875d-7f6c-45c2-a76b-4e9483273185] succeeded in 0.0006371149993356084s: None
于 2021-08-20T07:32:58.600 に答える
2

柔軟に追加/削除できるCelery+Redisの同じソリューションを探していました。これ、 redbeat、Herokuの同じ男、Redis+Sentinelもチェックしてください。

助けてくれることを願っています:)

于 2020-03-09T05:25:25.823 に答える
0

Celeryは、データベースを使用して動的な定期的なタスクを実現し、それ自体を呼び出すことができます。

ただし、APScheduleの方が優れています。

動的な定期的なタスクは常に長いカウントダウンまたはイータを意味するためです。これらの定期的なタスクが多すぎると、大量のメモリを消費する可能性があり、遅延のないタスクを再起動して実行するのに時間がかかります。

tasks.py

import sqlite3
from celery import Celery
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

app = Celery(
    'tasks',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/1',
    imports=['tasks'],
)

conn = sqlite3.connect('database.db', check_same_thread=False)
c = conn.cursor()
sql = '''
CREATE TABLE IF NOT EXISTS `tasks` 
(
   `id` INTEGER UNIQUE PRIMARY KEY AUTOINCREMENT,
   `name` TEXT,
   `countdown` INTEGER
);
'''
c.execute(sql)


def create(name='job', countdown=5):
    sql = 'INSERT INTO `tasks` (`name`, `countdown`) VALUES (?, ?)'
    c.execute(sql, (name, countdown))
    conn.commit()
    return c.lastrowid


def read(id=None, verbose=False):
    sql = 'SELECT * FROM `tasks` '
    if id:
        sql = 'SELECT * FROM `tasks` WHERE `id`={}'.format(id)
    all_rows = c.execute(sql).fetchall()
    if verbose:
        print(all_rows)
    return all_rows


def update(id, countdown):
    sql = 'UPDATE `tasks` SET `countdown`=? WHERE `id`=?'
    c.execute(sql, (countdown, id))
    conn.commit()


def delete(id, verbose=False):
    sql = 'DELETE FROM `tasks` WHERE `id`=?'
    affected_rows = c.execute(sql, (id,)).rowcount
    if verbose:
        print('deleted {} rows'.format(affected_rows))
    conn.commit()


@app.task
def job(id):
    id = read(id)
    if id:
        id, name, countdown = id[0]
    else:
        logger.info('stop')
        return

    logger.warning('id={}'.format(id))
    logger.warning('name={}'.format(name))
    logger.warning('countdown={}'.format(countdown))

    job.apply_async(args=(id,), countdown=countdown)

main.py

from tasks import *

id = create(name='job', countdown=5)
job(id)
# job.apply_async((id,), countdown=5)  # wait 5s

print(read())

input('enter to update')
update(id, countdown=1)

input('enter to delete')
delete(id, verbose=True)
于 2021-04-20T06:21:00.823 に答える