13

私は労働者の小さなプール(4)と非常に大きなタスクのリスト(5000〜)を持っています。私はプールを使用していて、map_async()でタスクを送信しています。私が実行しているタスクはかなり長いので、1つの長いプロセスがいくつかの短いプロセスを保持できないように、チャンクサイズを1に強制しています。

私がやりたいのは、提出する必要のあるタスクの数を定期的にチェックすることです。最大で4つがアクティブになることはわかっていますが、処理する必要があるのはいくつ残っているかが気になります。

私はグーグルで検索しましたが、これを行っている人を見つけることができません。

役立つ簡単なコード:

import multiprocessing
import time

def mytask(num):
    print('Started task, sleeping %s' % num)
    time.sleep(num)

pool = multiprocessing.Pool(4)
jobs = pool.map_async(mytask, [1,2,3,4,5,3,2,3,4,5,2,3,2,3,4,5,6,4], chunksize=1)
pool.close()

while True:
    if not jobs.ready():
        print("We're not done yet, %s tasks to go!" % <somethingtogettasks>)
        jobs.wait(2)
    else:
        break
4

4 に答える 4

9

あなたが望むもののように見えjobs._number_leftます。_これは、開発者の気まぐれで変更される可能性のある内部値であることを示していますが、その情報を取得する唯一の方法のようです。

于 2011-04-04T20:10:13.120 に答える
1

私が知っている気密な方法はありませんが、Pool.imap_unordered()map_asyncの代わりに関数を使用すると、処理される要素をインターセプトできます。

import multiprocessing
import time

process_count = 4

def mytask(num):
    print('Started task, sleeping %s' % num)
    time.sleep(num)
    # Actually, you should return the job you've created here.
    return num

pool = multiprocess.Pool(process_count)
jobs  = []
items = [1,2,3,4,5,3,2,3,4,5,2,3,2,3,4,5,6,4]
job_count = 0
for job in pool.imap_unordered(mytask, items):
    jobs.append(job)
    job_count += 1

    incomplete = len(items) - job_count
    unsubmitted = max(0, incomplete - process_count)

    print "Jobs incomplete: %s. Unsubmitted: %s" % incomplete, unsubmitted

pool.close()

減算process_countします。これは、2つの例外のいずれかを除いて、すべてのプロセスが処理されるとほぼ想定できるためです。1)イテレータを使用する場合、消費および処理するアイテムがこれ以上残っていない可能性があります。2)少ない可能性があります。残り4アイテム以上。私は最初の例外のためにコーディングしませんでした。しかし、必要に応じてそうするのはかなり簡単なはずです。とにかく、あなたの例はリストを使用しているので、その問題はないはずです。

編集:また、Whileループを使用していることに気付きました。これにより、何かを定期的に、たとえば0.5秒ごとなどに更新しようとしているように見えます。私が例として挙げたコードは、そのようにはしません。それが問題かどうかはわかりません。

于 2011-04-04T19:14:19.850 に答える
1

同様の要件があります。進捗状況を追跡し、結果に基づいて中間作業を実行し、任意の時点ですべての処理をクリーンに停止します。私がこれに対処した方法は、タスクを一度に1つずつ送信することですapply_async。私がしていることの非常に単純化されたバージョン:

maxProcesses = 4
q = multiprocessing.Queue()
pool = multiprocessing.Pool()
runlist = range(100000)
sendcounter = 0
donecounter = 0
while donecounter < len(runlist):
    if stopNowBooleanFunc():  # if for whatever reason I want to stop processing early
        if donecounter == sendcounter:  # wait til already sent tasks finish running
            break
    else:  # don't send new tasks if it's time to stop
        while sendcounter < len(runlist) and sendcounter - donecounter < maxProcesses:
            pool.apply_async(mytask, (runlist[sendcounter], q))
            sendcounter += 1

    while not q.empty():  # process completed results as they arrive
        aresult = q.get()
        processResults(aresult)
        donecounter += 1

Queue結果を使用する代わりに使用することに注意してくださいreturn

于 2015-08-20T09:07:14.983 に答える
1

Pool._cacheを使用していると仮定して属性を確認することで、保留中のジョブの数を確認できますapply_async。これは、使用可能になるまで保存される場所であり、保留中のApplyResultの数と等しくなります。ApplyResult

import multiprocessing as mp
import random
import time


def job():
    time.sleep(random.randint(1,10))
    print("job finished")

if __name__ == '__main__':
    pool = mp.Pool(5)
    for _ in range(10):
        pool.apply_async(job)

    while pool._cache:
        print("number of jobs pending: ", len(pool._cache))
        time.sleep(2)

    pool.close()
    pool.join()
于 2018-08-24T05:42:05.023 に答える