13

Celery を使用して、プライマリ サーバーからのイベントによってトリガーされるプッシュ通知と電子メールの送信を処理することを計画しています。

これらのタスクでは、外部サーバー (GCM、APS、電子メール サーバーなど) への接続を開く必要があります。一度に 1 つずつ処理することも、パフォーマンスを大幅に向上させるために 1 つの接続でまとめて処理することもできます。

多くの場合、これらのタスクのいくつかのインスタンスが短期間に別々にトリガーされます。たとえば、1 分間に、さまざまなユーザーにさまざまなメッセージを送信する必要がある数十のプッシュ通知が発生する可能性があります。

セロリでこれを処理する最良の方法は何ですか? メッセージごとに異なるタスクを設定するのが単純な方法のように思えますが、それにはインスタンスごとに接続を開く必要があります。

「すべての未処理のプッシュ通知タスク」などを処理できる、ある種のタスクアグリゲーターがあることを期待していました。

そのようなものは存在しますか?たとえば、アクティブなタスク グループに追加するなど、より良い方法はありますか?

何か不足していますか?

ロバート

4

2 に答える 2

9

celery.contrib.batches私は最近、自分のプロジェクトでモジュールを発見し、実装しました。私の意見では、追加のストレージ層が必要ないため、Tommaso の回答よりも優れたソリューションです。

ドキュメントから直接の例を次に示します。

100 メッセージごと、または 10 秒ごとにバッファをフラッシュするクリック カウンタ。データに対して何も行いませんが、データベースに保存するように簡単に変更できます。

# Flush after 100 messages, or 10 seconds.
@app.task(base=Batches, flush_every=100, flush_interval=10)
def count_click(requests):
    from collections import Counter
    count = Counter(request.kwargs['url'] for request in requests)
    for url, count in count.items():
        print('>>> Clicks: {0} -> {1}'.format(url, count))

ただし、注意してください。私の使用法では問題なく動作しますが、ドキュメントには「実験的なタスククラス」であると記載されています。これは、そのような不安定な説明を持つ機能の使用を思いとどまらせるかもしれません:)

于 2013-11-11T09:49:38.037 に答える
4

これを実現する簡単な方法は、タスクが実行する必要があるすべてのアクションを永続ストレージ (データベースなど) に記述し、定期的なジョブが実際のプロセスを 1 つのバッチで (単一の接続で) 実行するようにすることです。注: キューが 2 回処理されるのを防ぐために、ロックが設定されていることを確認してください。

昆布レベルで同様のことを行う方法についての良い例があります (http://ask.github.com/celery/tutorials/clickcounter.html)

個人的には、sentry がこのようにして db レベルでインクリメントをバッチ処理する方法が気に入っています (sentry.buffers モジュール)。

于 2012-10-03T09:45:43.913 に答える