3

シナリオ:

新しいビルドのために非常に大規模なDBモデルの移行が行われており、現在のライブデータをWebアプリからローカルテストデータベースに移行する方法の定型化に取り組んでいます。

モデルの移行を同時に処理するスクリプトをPythonでセットアップしたいと思います。モデルインスタンスのメソッドがfrom_legacyあります。to_legacyこれまでのところ、すべてのインスタンスをロードしてthreadsそれぞれに作成します。各スレッドは、変換を実行して結果を保存するメソッドをthreading使用して、コアモジュールからサブクラス化されています。run

プログラムのメインループにこれらのスレッドのインスタンスの大きなスタックを構築させ、それらを1つずつ処理し始め、その作業を実行すると同時に最大10個だけ実行し、次のスレッドをフィードします。他のユーザーが移行を終了すると処理されます。

私が理解できないのは、これを行うためにキューを正しく利用する方法ですか?各スレッドが移行の完全なタスクを表す場合、最初にすべてのインスタンスをロードしてから、10に設定されたQueueを作成し、maxsize現在実行中のキューのみを追跡する必要がありますか?おそらくこのようなものですか?

currently_running = Queue()
for model in models:
  task = Migrate(models) #this is subclassed thread
  currently_running.put(task)
  task.start()

putこの場合、容量がいっぱいになっている間にブロックする呼び出しに依存していますか?このルートに行くとしたら、どうやって電話すればいいtask_doneですか?

むしろ、キューには(開始されたタスクだけでなく)すべてのタスクが含まれ、join完了までブロックするために使用する必要がありますか?スレッドのキューを呼び出すjoinと、含まれているスレッドが開始されますか?

「最大でN個の実行中のスレッドがある」問題に取り組むための最良の方法は何ですか?また、キューはどのような役割を果たさなければなりませんか?

4

2 に答える 2

5

文書化されていませんが、multiprocessingモジュールには、ThreadPoolその名前が示すように、スレッドのプールを作成するクラスがあります。multiprocessing.Poolクラスと同じAPIを共有します。

次に、以下を使用してタスクをスレッドプールに送信できますpool.apply_async

import multiprocessing.pool as mpool

def worker(task):
    # work on task
    print(task)     # substitute your migration code here.

# create a pool of 10 threads
pool = mpool.ThreadPool(10)
N = 100

for task in range(N):
    pool.apply_async(worker, args = (task, ))

pool.close()
pool.join()
于 2012-12-13T21:59:39.553 に答える
0

これはおそらくセマフォを使用して行う必要があります。ドキュメントの例は、達成しようとしていることのヒントです。

于 2012-12-13T21:53:03.773 に答える