3

問題

ZEO サーバーにアクセスするパッケージを使用すると、Celery ワーカーがタスクの実行でハングします。ただし、 内でサーバーに直接アクセスする場合は、tasks.pyまったく問題ありません。

バックグラウンド

ZODBファイルを読み書きするプログラムがあります。複数のユーザーがこのデータベースに同時にアクセスして変更できるようにしたいので、複数のプロセスとスレッド間で安全にするZEO サーバーでデータベースを管理します。プログラムのモジュール内でデータベースを定義します。

from ZEO import ClientStorage
from ZODB.DB import DB

addr = 'localhost', 8090
storage = ClientStorage.ClientStorage(addr, wait=False)
db = DB(storage)

SSCCE

私は明らかにもっと複雑な操作を試みていますが、ルート オブジェクトまたはその子のキーのみが必要であると仮定しましょう。このコンテキストで問題を作成できます。

dummy_package上記のコードを使用して、モジュール とdatabases.py、データベース アクセスを実行するための必要最小限のモジュールを作成します。

# main.py

def get_keys(dict_like):
    return dict_like.keys()

でデータベース アクセスを試行しない場合dummy_package、データベースをインポートしてルートに問題なくアクセスできます。

# tasks.py
from dummy_package import databases

@task()
def simple_task():

    connection = databases.db.open()
    keys = connection.root().keys()
    connection.close(); databases.db.close()
    return keys  # Works perfectly

ただし、接続または子を渡そうとするとroot、タスクが無期限にハングします。

@task()
def simple_task():
    connection = databases.db.open()
    root = connection.root()
    ret = main.get_keys(root)  # Hangs indefinitely
    ...

違いがある場合、これらの Celery タスクは Django によってアクセスされます。

質問

では、まず、ここで何が起こっているのでしょうか。この方法で ZEO サーバーにアクセスすることによって何らかの競合状態が発生することはありますか?

すべてのデータベースに Celery の責任でアクセスさせることもできますが、それでは醜いコードになってしまいますさらに、スタンドアロン プログラムとして機能するプログラムの機能が損なわれます。Celery ワーカーによって呼び出されるルーチン内で ZEO と対話することはできませんか?

4

2 に答える 2

2

開いている接続またはそのルート オブジェクトをグローバルとして保存しないでください。

スレッドごとに接続が必要です。ZEO が複数のスレッドのアクセスを可能にするという理由だけで、スレッドローカルではないもの (database.py のモジュールレベルのグローバルなど) を使用しているように思えます。

データベースをグローバルとして保存しますが、各タスク中に db.open() を呼び出します。http://zodb.readthedocs.org/en/latest/api.html#connection-poolを参照してください

于 2013-06-21T14:37:59.553 に答える
0

何が起こっているのか完全には理解できませんが、デッドロックは、Celery がmultiprocessingデフォルトで並行処理に使用するという事実と関係があるのではないかと考えています。ZEO サーバーにアクセスする必要があるタスクに Eventlet を使用するように切り替えると、問題が解決しました。

私のプロセス

  1. Eventletを使用するワーカーと、 standard を使用するワーカーを起動しますmultiproccesing

    celeryはデフォルト キューの名前であるため (歴史的な理由から)、Eventlet ワーカーにこのキューを処理させます。

    $ celery worker --concurrency=500 --pool=eventlet --loglevel=debug \ 
                     -Q celery                --hostname eventlet_worker
    $ celery worker  --loglevel=debug \
                     -Q multiprocessing_queue --hostname multiprocessing_worker
    
  2. 標準を必要とするタスクmultiprocessingを適切なキューにルーティングします。他のすべてはcelery、デフォルトでキュー (イベントレット管理) にルーティングされます。(Django を使用している場合、これは に入りますsettings.py):

    CELERY_ROUTES = {'project.tasks.ex_task': {'queue': 'multiprocessing_queue'}}
    
于 2013-07-01T16:33:27.253 に答える