4

プロジェクトでPythonとセロリを使用しています。プロジェクトには、2つのファイルがあります。

celeryconfig.py

BROKER_URL = "amqp://guest:guest@localhost:5672//"
CELERY_RESULT_BACKEND = "amqp"
CELERY_IMPORTS = ("example",)
CELERYD_CONCURRENCY = 2

およびexample.py

from celery.task import task
import hashlib

md5 = hashlib.md5()

@task
def getDigest(text):
    print 'Using md5 - ',md5
    md5.update(text)
    return md5.digest()

celeryconfig.pyで、 CELERYD_CONCURRENCY2に設定しました。これは、タスクキュー内のタスクを2つの異なるプロセスに分散することを意味します。

Pythonコンソールから、次のコマンドを実行します。

from example import getDigest
getDigest.delay('foo');getDigest.delay('bar')

これにより、2人のワーカーによって同時に実行される2つのタスクが作成されます。問題は、両方のワーカープロセスがタスク関数[ getDigest() ]を実行しているため、同じハッシュオブジェクト(md5)を使用しているように見えることです。以下に示すように、 celerydの出力はこれを確認します。

[PoolWorker-2] Using md5 -
[PoolWorker-2] <md5 HASH object @ 0x23e6870>
[PoolWorker-1] Using md5 -
[PoolWorker-1] <md5 HASH object @ 0x23e6870>

簡単にするために、hashlibのmd5オブジェクトを使用していますが、実際のプロジェクトでは、複数のプロセスからアクセスおよび変更できないオブジェクトを使用しています。これにより、ワーカーがクラッシュすることが予想されます。

それは質問を提起します:ワーカープロセスを初期化して独自の( md5)オブジェクトを使用するようにコードを変更するにはどうすればよいですか?現在、それらは同じオブジェクトを共有しているため、アプリケーションがクラッシュします。これは可能ですか?

4

1 に答える 1

7

コードで明示的に指示しているため、同じオブジェクトを使用しています。タスクのスコープ外でオブジェクトを作成し、それをタスク内で使用することにより、すべてのワーカーに共有オブジェクトへのアクセスを許可します。これは同時実行の問題であり、必ずしもCeleryの問題ではありません。オブジェクトが小さい場合はそのコピーを使用することも、独自のロック戦略を使用することもできます。ただし、一般に、オブジェクトが一度に複数のプロセスによって更新される場合は、Celeryの範囲外である何らかの同期を採用する必要があります。

于 2011-12-28T04:33:59.347 に答える