47

Celery スタンドアロンを使用しています (Django 内ではありません)。複数の物理マシンで 1 つのワーカー タスク タイプを実行する予定です。タスクは次のことを行います

  1. XML ドキュメントを受け入れます。
  2. それを変換します。
  3. 複数データベースの読み取りと書き込みを行います。

私は PostgreSQL を使用していますが、これは接続を使用する他の種類のストアにも同様に当てはまります。以前は、データベース接続プールを使用して、リクエストごとに新しいデータベース接続を作成したり、接続を長時間開いたままにしたりしないようにしていました。ただし、各 Celery ワーカーは個別のプロセスで実行されるため、実際にどのようにプールを共有できるかはわかりません。何か不足していますか?Celery を使用すると、Celery ワーカーから返された結果を永続化できることは知っていますが、ここで行おうとしているのはそれではありません。各タスクは、処理されるデータに応じて、いくつかの異なる更新または挿入を実行できます。

Celery ワーカー内からデータベースにアクセスする正しい方法は何ですか?

複数のワーカー/タスク間でプールを共有することは可能ですか? またはこれを行う他の方法はありますか?

4

6 に答える 6

37

ワーカーごとに 1 つの接続という tigeronk2 のアイデアが気に入っています。彼が言うように、Celery は独自のワーカー プールを保持しているため、別個のデータベース接続プールは実際には必要ありません。Celery Signalのドキュメントでは、ワーカーの作成時にカスタム初期化を行う方法が説明されているため、次のコードを tasks.py に追加したところ、期待どおりに動作するようです。ワーカーがシャットダウンされているときに、接続を閉じることさえできました。

from celery.signals import worker_process_init, worker_process_shutdown

db_conn = None

@worker_process_init.connect
def init_worker(**kwargs):
    global db_conn
    print('Initializing database connection for worker.')
    db_conn = db.connect(DB_CONNECT_STRING)


@worker_process_shutdown.connect
def shutdown_worker(**kwargs):
    global db_conn
    if db_conn:
        print('Closing database connectionn for worker.')
        db_conn.close()
于 2014-11-10T22:11:06.407 に答える
3

ワーカー プロセスごとに 1 つの DB 接続を使用します。セロリ自体がワーカー プロセスのプールを維持するため、データベース接続は常にセロリ ワーカーの数と同じになります。裏を返せば、db 接続プーリングをセロリ ワーカー プロセス管理に結びつけることです。しかし、GIL がプロセス内で一度に 1 つのスレッドしか許可しないことを考えると、それは問題ありません。

于 2013-01-27T17:05:16.290 に答える
2

デフォルトの動作をオーバーライドして、セロリ構成のプロセスごとのワーカーではなく、スレッド化されたワーカーを使用できます。

CELERYD_POOL = "celery.concurrency.threads.TaskPool"

次に、共有プールインスタンスをタスクインスタンスに保存し、スレッド化された各タスク呼び出しから参照できます。

于 2013-01-25T17:01:27.163 に答える
1

おそらく、celery.concurrency.geventはプール共有を提供し、GILを悪化させない可能性があります。ただし、そのサポートはまだ「実験的」です。

そして、すべてが単一のプロセス/スレッドで実行されるグリーンレット(コルーチン)間で共有するpsycopg2.pool.SimpleConnectionPool 。

このトピックに関する他のスタックディスカッションのほんの少し。

于 2013-01-25T23:05:13.203 に答える
0

実装および監視することにより、私の調査結果に貢献します。

ようこそフィードバック。

参照: プーリングを使用するhttp://www.prschmid.com/2013/04/using-sqlalchemy-with-celery-tasks.html

各ワーカー プロセス (-ck で指定された prefork モード) は、プーリングや再利用なしで、DB への新しい接続を 1 つ確立します。そのため、プーリングを使用している場合、プールは各ワーカー プロセス レベルでのみ表示されます。したがって、プール サイズ > 1 は役に立ちませんが、接続の再利用は、オープンとクローズから接続を保存するのに問題ありません。

ワーカー プロセスごとに 1 つの接続を使用する場合、初期化フェーズでワーカー プロセス (prefork モード celery -A app worker -ck) ごとに 1 つの DB 接続が確立されます。開閉を繰り返して接続を保存します。

ワーカー スレッド (イベントレット) の数に関係なく、各ワーカー スレッド (セロリ -A アプリ ワーカー -P イベントレット) は、プーリングや再利用なしで DB への接続を 1 つだけ確立します。したがって、イベントレットの場合、1 つのセロリ プロセス (セロリ -A アプリ ワーカー ...) 上のすべてのワーカー スレッド (イベントレット) は、各瞬間に 1 つのデータベース接続を持ちます。

セロリのドキュメントによると

ただし、タスクがブロッキング呼び出しを実行しないようにする必要があります。これは、ブロッキング呼び出しが返されるまで、ワーカー内の他のすべての操作を停止するためです。

おそらく、MYSQL DB 接続の方法が呼び出しをブロックしていることが原因です。

于 2016-07-26T05:56:10.727 に答える