6

Python 2.7 を使用しています。

現在、次のように ThreadPoolExecuter を使用しています。

params = [1,2,3,4,5,6,7,8,9,10]
with concurrent.futures.ThreadPoolExecutor(5) as executor:
    result = list(executor.map(f, params))

問題は、f実行時間が長すぎる場合があることです。を実行するたびにf、その実行を 100 秒に制限してから、それを殺します。

最終的に、 の各要素についてxparam強制終了する必要があるかどうかf、また、そうでない場合の戻り値を示したいと思います。1 つのパラメータがタイムアウトしてもf、次のパラメータで実行したい。

このexecuter.mapメソッドにはパラメーターがありますが、各スレッドごとに個別にではなく、timeoutへの呼び出し時から実行全体のタイムアウトを設定します。executer.map

目的の動作を取得する最も簡単な方法は何ですか?

4

1 に答える 1

8

この答えは、関数がネットワーク呼び出しを待機している場合を除き、通常はスレッド ライブラリよりも望ましい python のマルチプロセッシング ライブラリに関するものです。マルチプロセッシング ライブラリとスレッド ライブラリには同じインターフェイスがあることに注意してください。

プロセスがそれぞれ 100 秒間実行される可能性があることを考えると、それぞれのプロセスを作成するオーバーヘッドは比較するとかなり小さくなります。必要な制御を行うには、おそらく独自のプロセスを作成する必要があります。

1 つのオプションは、最大 100 秒間実行される別の関数で f をラップすることです。

from multiprocessing import Pool

def timeout_f(arg):
    pool = Pool(processes=1)
    return pool.apply_async(f, [arg]).get(timeout=100)

次に、コードは次のように変更されます。

    result = list(executor.map(timeout_f, params))

または、独自のスレッド/プロセス コントロールを作成することもできます。

from multiprocessing import Process
from time import time

def chunks(l, n):
    """ Yield successive n-sized chunks from l. """
    for i in xrange(0, len(l), n):
        yield l[i:i+n]

processes = [Process(target=f, args=(i,)) for i in params]
exit_codes = []
for five_processes = chunks(processes, 5):
    for p in five_processes:
        p.start()
    time_waited = 0
    start = time()
    for p in five_processes:
        if time_waited >= 100:
            p.join(0)
            p.terminate()
        p.join(100 - time_waited)
        p.terminate()
        time_waited = time() - start
    for p in five_processes:
        exit_codes.append(p.exit_code)

Can I get a return value from multiprocessing.Process? のような方法で戻り値を取得する必要があります。

プロセスの終了コードは、プロセスが完了した場合は 0 になり、プロセスが終了した場合はゼロ以外になります。

テクニック: Join a group of python processes with a timeout ,リストを均等なサイズのチャンクに分割するにはどうすればよいですか?


別のオプションとして、 multiprocessing.Poolで apply_async を使用することもできます。

from multiprocessing import Pool, TimeoutError
from time import sleep    

if __name__ == "__main__":
    pool = Pool(processes=5)
    processes = [pool.apply_async(f, [i]) for i in params]
    results = []
    for process in processes:
        try:
            result.append(process.get(timeout=100))
        except TimeoutError as e:
            results.append(e)

最初のプロセスが完了するまでに 50 秒かかる場合、2 番目のプロセスの実行時間は 50 秒余分にかかるため、上記は各プロセスで 100 秒以上待機する可能性があることに注意してください。より厳密なタイムアウトを適用するには、より複雑なロジック (前の例など) が必要です。

于 2014-09-22T15:35:38.713 に答える