1

Celery (RabbitMQ を使用) にロガーがあり、緊急の場合にその作業を複製したいと考えています。

# tasks.py
@task
def log(message):
    with open('test.txt', 'a') as f:
        f.write(message)


# views.py
log.delay(message)

Celery がlog()呼び出されたときに、別のマシンで 2 つのインスタンスを実行するにはどうすればよいですか?

これを行うことはまったく意味がありますか?

これは RabbitMQ で可能です。トピックベースの exchangeを使用している場合、1 つのメッセージを 2 つの異なるキューに入れ、2 つの受信者に個別に配信できることは明らかです。

sender =>
[message, routing_key=event.logging.log] => [queue A, topic=event.#]     
                                                      => receiver 1
                                         => [queue B, topic=*.logging.*]
                                                      => receiver 2

メッセージは両方のキューに送信され、別のキューからメッセージを盗むことはありません。

4

2 に答える 2

1

これを行うには、(あなたが言うように)トピック交換になるように交換を構成する必要があります。

CELERY_QUEUES = {
   'celery': {
       'exchange': 'celerytopic',
       'exchange_type': 'topic',
       'routing_key': 'celery',
   },
}

次に、AMQPAPIを使用してバックアップ交換を作成できます。

 from celery import current_app as celery

 with celery.broker_connection() as conn:
     conn.default_channel.queue_declare(queue='celery.backup', durable=True)
     conn.default_channel.queue_bind(queue='celery.backup',
                                     exchange='celerytopic',
                                     routing_key='celery',
                                     durable=True)

すでにceleryという名前のキューがあるので、最初にそれを削除する必要があるかもしれません。

$ camqadm queue.delete celery
于 2012-07-02T15:35:41.527 に答える
1

このタスクを 2 台の異なるマシンで開始しようとしても意味がありません。少なくともCeleryは、タスクが異なるマシンで実行されることを保証できません-負荷を分散するのはRabbitMQであり、1つのノードが他のノードよりも負荷が少ない場合-実行される2つのタスクはおそらくそのマシンで実行されます...

タスクを使用します。retry代わりは。Celery は、実行に失敗した場合、タスクを再試行します。Celery は、タスクが失敗したかどうかを理解するほどスマートです。タスクが失敗した場合は何らかの例外を発生させ、正常にログに記録できない場合は黙って戻らないようにしてください。

アップデート:

考えられるワークフローは次のとおりです-タスクの実行を試み、失敗した場合はon_retryでrouting_keyを変更し、フェイルオーバーキューになる可能性のある別の交換/キューでタスクの実行を試みます。

于 2012-07-02T11:21:05.960 に答える