MyDocと呼ばれるMongoEngineスキーマに続くアクセス可能なmongoDBコレクションに数十万のドキュメントがあります。これらの各ドキュメントで実行されるものがあります(my_operationと呼びます)。また、my_operationは、create_data_dict関数を介して構築されたdata_dictと呼ばれるOrderedDictを必要とします(読み取りのみ、変更はしません)。そして、セロリワーカーを介してmy_operationを並行して実行できるようにしたいと考えています。
セットアップには、django、mongo、mongoengine、celeryが含まれます。
オプション1:
@celery.task()
def my_operation(my_doc_list):
data_dict = create_data_dict()
for doc in my_doc_list:
do_something_to_doc(data_dict, doc)
doc.save()
def create_data_dict():
#blah blah blah
return data_dict
#So I run everything like this:
batch_size = len(MyDoc.objects)/no_of_celery_workers
for b in xrange(0, len(MyDoc.objects), batch_size):
my_operation.delay(MyDoc.objects[b:b+batch_size])
オプション2:my_operationはdata_dictとMyDocインスタンスを取ります
@celery.task()
def my_operation(data_dict, my_doc):
do_something_to_doc(data_dict, my_doc)
my_doc.save()
def create_data_dict():
#blah blah blah
return data_dict
#So I run everything like this:
data_dict = create_data_dict()
celery.group([my_operation.s(data_dict, my_doc) for my_doc in MyDoc.objects]).apply_async()
オプション3:
@celery.task()
def my_operation(my_doc):
data_dict = create_data_dict()
do_something_to_doc(data_dict, my_doc)
my_doc.save()
def create_data_dict():
#blah blah blah
return data_dict
#So I run everything like this:
celery.group([my_operation.s(my_doc) for my_doc in MyDoc.objects]).apply_async()
オプション4:
@celery.task()
def my_operation(my_doc):
data_dict = get_data_dict()
do_something_to_doc(data_dict, my_doc)
my_doc.save()
def create_data_dict():
#blah blah blah
return data_dict
def get_data_dict():
data_dict = cache.get("data_dict")
if data_dict is None:
data_dict = create_data_dict()
cache.set("data_dict", data_dict)
return data_dict
#So I run everything like this:
celery.group([my_operation.s(my_doc) for my_doc in MyDoc.objects]).apply_async()
Option1が機能していたら、おそらく質問はしなかっただろうが、残念ながら、クエリセットの応答のスライスを渡すことも、クエリセット自体をセロリワーカーに渡すこともできません。それは、バックトレースが主に言っているように見えたものです。
Option2を使用すると、すべてのタスクでdata_dictを渡すことになり、どういうわけかあまり魅力的に聞こえません。また、複数のマシンでセロリワーカーを実行する場合(これを実行するつもりです)、data_dictは、基本的に1回だけ渡す必要があるため、多くのネットワークを無価値に消費します。
また、Option3の場合、data_dictはすべてのドキュメントに対して新たに作成されますが、これは処理能力の大きな浪費のようです。
オプション4:data_dictを再計算したり、すべてのドキュメントで再送信したりする代わりに、キャッシュを使用してdata_dictをバックアップします。これは最良のアイデアのように聞こえますが、落とし穴があります。次回、すべてのMyDocでmy_operationを実行するときは、キャッシュにあるかどうかに関係なく、data_dictを再計算したいと思います。それを達成する方法はありますか?
質問:それを行うための最良の方法は何ですか?