3

質問

celery の定期的なタスク スケジューラである Beat を介してタスクを実行した後、RabbitMQ に未使用のキューがたくさん残っているのはなぜですか?

設定

  • Heroku で動作する Django Web アプリ
  • セロリビートでスケジュールされたタスク
  • セロリ ワーカーを介して実行されるタスク
  • メッセージ ブローカーは CloudAMQP の RabbitMQ です

プロフィール

web: gunicorn --workers=2 --worker-class=gevent --bind=0.0.0.0:$PORT project_name.wsgi:application
scheduler: python manage.py celery worker --loglevel=ERROR -B -E --maxtasksperchild=1000
worker: python manage.py celery worker -E --maxtasksperchild=1000 --loglevel=ERROR

設定.py

CELERYBEAT_SCHEDULE = {
    'do_some_task': {
        'task': 'project_name.apps.appname.tasks.some_task',
        'schedule': datetime.timedelta(seconds=60 * 15),
        'args': ''
    },
}

タスク.py

@celery.task
def some_task()
    # Get some data from external resources
    # Save that data to the database
    # No return value specified

結果

タスクが実行されるたびに、(RabbitMQ Web インターフェイス経由で) 次の情報が得られます。

  • 「Queued Messages」の下の「Ready」状態の追加メッセージ
  • 「準備完了」状態のメッセージが 1 つある追加のキュー
    • このキューにはコンシューマーがリストされていません
4

2 に答える 2

4

それは私の設定になりましたCELERY_RESULT_BACKEND

以前は、次のとおりでした。

CELERY_RESULT_BACKEND = 'amqp'

次のように変更した後、RabbitMQ に未使用のメッセージ/キューがなくなりました。

CELERY_RESULT_BACKEND = 'database'

どうやら、タスクが実行された後、セロリはそのタスクに関する情報をrabbitmq経由で送り返していたようですが、これらの応答メッセージを消費するためのセットアップが何もなかったため、未読のメッセージがキューに入ってしまいました。 .

注: これは、セロリがタスクの結果を記録するデータベース エントリを追加することを意味します。データベースが無用なメッセージでいっぱいになるのを防ぐために、次のように追加しました。

# Delete result records ("tombstones") from database after 4 hours
# http://docs.celeryproject.org/en/latest/configuration.html#celery-task-result-expires
CELERY_TASK_RESULT_EXPIRES = 14400

Settings.py の関連部分

########## CELERY CONFIGURATION
import djcelery
# https://github.com/celery/django-celery/
djcelery.setup_loader()

INSTALLED_APPS = INSTALLED_APPS + (
    'djcelery',
)

# Compress all the messages using gzip
# http://celery.readthedocs.org/en/latest/userguide/calling.html#compression
CELERY_MESSAGE_COMPRESSION = 'gzip'

# See: http://docs.celeryproject.org/en/latest/configuration.html#broker-transport
BROKER_TRANSPORT = 'amqplib'

# Set this number to the amount of allowed concurrent connections on your AMQP
# provider, divided by the amount of active workers you have.
#
# For example, if you have the 'Little Lemur' CloudAMQP plan (their free tier),
# they allow 3 concurrent connections. So if you run a single worker, you'd
# want this number to be 3. If you had 3 workers running, you'd lower this
# number to 1, since 3 workers each maintaining one open connection = 3
# connections total.
#
# See: http://docs.celeryproject.org/en/latest/configuration.html#broker-pool-limit
BROKER_POOL_LIMIT = 3

# See: http://docs.celeryproject.org/en/latest/configuration.html#broker-connection-max-retries
BROKER_CONNECTION_MAX_RETRIES = 0

# See: http://docs.celeryproject.org/en/latest/configuration.html#broker-url
BROKER_URL = os.environ.get('CLOUDAMQP_URL')

# Previously, had this set to 'amqp', this resulted in many read / unconsumed
# queues and messages in RabbitMQ
# See: http://docs.celeryproject.org/en/latest/configuration.html#celery-result-backend
CELERY_RESULT_BACKEND = 'database'

# Delete result records ("tombstones") from database after 4 hours
# http://docs.celeryproject.org/en/latest/configuration.html#celery-task-result-expires
CELERY_TASK_RESULT_EXPIRES = 14400
########## END CELERY CONFIGURATION
于 2012-11-08T04:21:31.713 に答える
1

消費したタスクから応答が返ってきているようです。

次のようにすることでそれを回避できます。

@celery.task(ignore_result=True)
于 2013-02-19T18:30:08.253 に答える