1

MyDocと呼ばれるMongoEngineスキーマに続くアクセス可能なmongoDBコレクションに数十万のドキュメントがあります。これらの各ドキュメントで実行されるものがあります(my_operationと呼びます)。また、my_operationは、create_data_dict関数を介して構築されたdata_dictと呼ばれるOrderedDictを必要とします(読み取りのみ、変更はしません)。そして、セロリワーカーを介してmy_operationを並行して実行できるようにしたいと考えています。

セットアップには、django、mongo、mongoengine、celeryが含まれます。

オプション1:

@celery.task()
def my_operation(my_doc_list):
   data_dict = create_data_dict()
   for doc in my_doc_list:
       do_something_to_doc(data_dict, doc)
       doc.save()

def create_data_dict():
   #blah blah blah
   return data_dict

#So I run everything like this:
batch_size = len(MyDoc.objects)/no_of_celery_workers
for b in xrange(0, len(MyDoc.objects), batch_size):
    my_operation.delay(MyDoc.objects[b:b+batch_size])

オプション2:my_operationはdata_dictとMyDocインスタンスを取ります

@celery.task()
def my_operation(data_dict, my_doc):
   do_something_to_doc(data_dict, my_doc)
   my_doc.save()

def create_data_dict():
   #blah blah blah
   return data_dict

#So I run everything like this:
data_dict = create_data_dict()
celery.group([my_operation.s(data_dict, my_doc) for my_doc in MyDoc.objects]).apply_async()

オプション3:

@celery.task()
def my_operation(my_doc):
   data_dict = create_data_dict()
   do_something_to_doc(data_dict, my_doc)
   my_doc.save()

def create_data_dict():
   #blah blah blah
   return data_dict

#So I run everything like this:
celery.group([my_operation.s(my_doc) for my_doc in MyDoc.objects]).apply_async()

オプション4:

@celery.task()
def my_operation(my_doc):
   data_dict = get_data_dict()
   do_something_to_doc(data_dict, my_doc)
   my_doc.save()

def create_data_dict():
   #blah blah blah
   return data_dict

def get_data_dict():
   data_dict = cache.get("data_dict")
   if data_dict is None:
        data_dict = create_data_dict()
        cache.set("data_dict", data_dict)
   return data_dict

#So I run everything like this:
celery.group([my_operation.s(my_doc) for my_doc in MyDoc.objects]).apply_async()

Option1が機能していたら、おそらく質問はしなかっただろうが、残念ながら、クエリセットの応答のスライスを渡すことも、クエリセット自体をセロリワーカーに渡すこともできません。それは、バックトレースが主に言っているように見えたものです。

Option2を使用すると、すべてのタスクでdata_dictを渡すことになり、どういうわけかあまり魅力的に聞こえません。また、複数のマシンでセロリワーカーを実行する場合(これを実行するつもりです)、data_dictは、基本的に1回だけ渡す必要があるため、多くのネットワークを無価値に消費します。

また、Option3の場合、data_dictはすべてのドキュメントに対して新たに作成されますが、これは処理能力の大きな浪費のようです。

オプション4:data_dictを再計算したり、すべてのドキュメントで再送信したりする代わりに、キャッシュを使用してdata_dictをバックアップします。これは最良のアイデアのように聞こえますが、落とし穴があります。次回、すべてのMyDocでmy_operationを実行するときは、キャッシュにあるかどうかに関係なく、data_dictを再計算したいと思います。それを達成する方法はありますか?

質問:それを行うための最良の方法は何ですか?

4

1 に答える 1

0

表面的には、オプション 2 はオプション 1 と同じように聞こえます。オブジェクトまたはそのデータを渡しているためです。

ここには多くの不明な点がありますが、オプション 4 でキャッシュの競合/競合状態が発生する可能性があると述べていることを考えると、毎回データを生成してオブジェクト ID を渡すことを選択するか、それが法外に高価な場合は実装しますキャッシュされたデータを格納し、タスクの一部としてキャッシュを消去するコレクション (競合状態を停止するための findAndModify)。

于 2013-03-01T10:16:34.143 に答える