19

複数の program.py インスタンスを同時に実行したいのですが、同時に実行するインスタンスの数を制限します (たとえば、システムで使用可能な CPU コアの数に制限します)。たとえば、10 個のコアがあり、program.py を合計 1000 回実行する必要がある場合、常に 10 個のインスタンスのみが作成されて実行されます。

マルチプロセッシング モジュール、マルチスレッド、およびキューの使用を試みましたが、簡単な実装に役立つと思われるものは何もありません。私が抱えている最大の問題は、同時に実行されるプロセスの数を制限する方法を見つけることです。一度に 1000 個のプロセスを作成すると、フォーク爆弾と同等になるため、これは重要です。プログラムによってプロセスから返される結果 (ディスクに出力される) は必要なく、プロセスはすべて互いに独立して実行されます。

これをPythonまたはbashで実装する方法の提案や例を教えてください。キューを使用してこれまでに作成したコードを投稿しますが、意図したとおりに機能せず、すでに間違ったパスをたどっている可能性があります。

どうもありがとう。

4

4 に答える 4

26

Pool.mapアプローチはあまり意味がないとおっしゃっていたと思います。マップは、作業のソースを提供する簡単な方法であり、各アイテムに適用するための呼び出し可能です。マップのfuncforは、指定された引数で実際の作業を行うための任意のエントリポイントにすることができます。

それがあなたにとって正しくないと思われる場合は、生産者/消費者パターンの使用について、ここにかなり詳細な回答があります:https ://stackoverflow.com/a/11196615/496445

基本的に、キューを作成し、N個のワーカーを開始します。次に、メインスレッドからキューにフィードするか、キューにフィードするProducerプロセスを作成します。ワーカーはキューから作業を継続するだけであり、開始したプロセスの数よりも多くの同時作業が発生することはありません。

プロデューサーが消費する速度とリソースにも制約を課す必要がある場合は、キューに制限を設定して、未処理の作業が既に多すぎる場合にプロデューサーをブロックするオプションもあります。

呼び出される仕事関数は、あなたが望むことを何でもすることができます。これは、システムコマンドのラッパーにすることも、Pythonlibをインポートしてメインルーチンを実行することもできます。限られたリソースの下で任意の実行可能ファイルを実行するように構成を設定できる特定のプロセス管理システムがありますが、これはそれを実行するための基本的なPythonアプローチにすぎません。

の他の答えからの抜粋:

基本プール:

from multiprocessing import Pool

def do_work(val):
    # could instantiate some other library class,
    # call out to the file system,
    # or do something simple right here.
    return "FOO: %s" % val

pool = Pool(4)
work = get_work_args()
results = pool.map(do_work, work)

プロセスマネージャーとプロデューサーを使用する

from multiprocessing import Process, Manager
import time
import itertools

def do_work(in_queue, out_list):
    while True:
        item = in_queue.get()

        # exit signal 
        if item == None:
            return

        # fake work
        time.sleep(.5)
        result = item

        out_list.append(result)


if __name__ == "__main__":
    num_workers = 4

    manager = Manager()
    results = manager.list()
    work = manager.Queue(num_workers)

    # start for workers    
    pool = []
    for i in xrange(num_workers):
        p = Process(target=do_work, args=(work, results))
        p.start()
        pool.append(p)

    # produce data
    # this could also be started in a producer process
    # instead of blocking
    iters = itertools.chain(get_work_args(), (None,)*num_workers)
    for item in iters:
        work.put(item)

    for p in pool:
        p.join()

    print results
于 2012-08-17T01:02:10.217 に答える
3

プロセス スーパーバイザーを使用する必要があります。1 つのアプローチは、 Circusが提供する API を使用して「プログラムで」行うことです。ドキュメント サイトは現在オフラインですが、一時的な問題だと思います。とにかく、Circus を使用してこれを処理できます。別のアプローチは、supervisordを使用し、プロセスのパラメーターnumprocsを使用しているコアの数に設定することです。

Circus を使用した例:

from circus import get_arbiter

arbiter = get_arbiter("myprogram", numprocesses=3)
try:
    arbiter.start()
finally:
    arbiter.stop()
于 2012-08-17T00:18:00.793 に答える
2

Python ではなく Bash スクリプトですが、単純な並列処理によく使用します。

#!/usr/bin/env bash
waitForNProcs()
{
 nprocs=$(pgrep -f $procName | wc -l)
 while [ $nprocs -gt $MAXPROCS ]; do
  sleep $SLEEPTIME
  nprocs=$(pgrep -f $procName | wc -l)
 done
}
SLEEPTIME=3
MAXPROCS=10
procName=myPython.py
for file in ./data/*.txt; do
 waitForNProcs
 ./$procName $file &
done

または非常に単純なケースの場合、別のオプションは xargs で、P は procs の数を設定します

find ./data/ | grep txt | xargs -P10 -I SUB ./myPython.py SUB 
于 2012-08-17T00:04:56.037 に答える
1

multiprocessing.pool の使用に関する多くの回答がありますが、メモリ使用量が重要な場合に実際により有益な multiprocessing.Process の使用方法に関するコード スニペットは多くありません。1000 個のプロセスを開始すると、CPU が過負荷になり、メモリが失われます。各プロセスとそのデータ パイプラインがメモリを集中的に使用する場合、OS または Python 自体が並列プロセスの数を制限します。以下のコードを開発して、バッチで CPU に送信されるジョブの同時数を制限しました。バッチ サイズは、CPU コアの数に比例してスケーリングできます。私の Windows PC では、バッチあたりのジョブ数は、利用可能な CPU コアの 4 倍まで効率的です。

import multiprocessing
def func_to_be_multiprocessed(q,data):
    q.put(('s'))
q = multiprocessing.Queue()
worker = []
for p in range(number_of_jobs):
    worker[p].append(multiprocessing.Process(target=func_to_be_multiprocessed, \
        args=(q,data)...))
num_cores = multiprocessing.cpu_count()
Scaling_factor_batch_jobs = 3.0
num_jobs_per_batch = num_cores * Scaling_factor_batch_jobs
num_of_batches = number_of_jobs // num_jobs_per_batch
for i_batch in range(num_of_batches):
    floor_job = i_batch * num_jobs_per_batch
    ceil_job  = floor_job + num_jobs_per_batch
    for p in worker[floor_job : ceil_job]:
                                         worker.start()
    for p in worker[floor_job : ceil_job]:
                                         worker.join()
for p in worker[ceil_job :]:
                           worker.start()
for p in worker[ceil_job :]:
                           worker.join()
for p in multiprocessing.active_children():
                           p.terminate()
result = []
for p in worker:
   result.append(q.get())

唯一の問題は、バッチ内のジョブのいずれかが完了できず、ハング状態になると、ジョブの残りのバッチが開始されないことです。したがって、処理される関数には、適切なエラー処理ルーチンが必要です。

于 2017-12-29T13:16:50.760 に答える