3

さまざまなパラメータを使用して、頻繁に実行する必要のある実行可能ファイルがあります。このために、ここに示すパターンに従って、マルチプロセッシングモジュールを使用して小さなPython(2.7)ラッパーを作成しました。

私のコードは次のようになります。

try:
     logging.info("starting pool runs")
     pool.map(run_nlin, params)
     pool.close()
 except KeyboardInterrupt:
     logging.info("^C pressed")
     pool.terminate()
 except Exception, e:
     logging.info("exception caught: ", e)
     pool.terminate()
 finally:
     time.sleep(5)
     pool.join()
     logging.info("done")

私のワーカー関数はここにあります:

class KeyboardInterruptError(Exception): pass

def run_nlin((path_config, path_log, path_nlin, update_method)):
    try:
        with open(path_log, "w") as log_:
            cmdline = [path_nlin, path_config]
            if update_method:
                cmdline += [update_method, ]
            sp.call(cmdline, stdout=log_, stderr=log_)
    except KeyboardInterrupt:
        time.sleep(5)
        raise KeyboardInterruptError()
    except:
        raise

path_configバイナリプログラムの設定ファイルへのパスです。たとえば、プログラムを実行する日付があります。

ラッパーを起動すると、すべてが正常に見えます。ただし、を押すと、ラッパースクリプトは、終了する前にプールから^C追加のプロセスを起動するようです。numproc例として、1〜10日目にスクリプトを開始すると、ps aux出力でバイナリプログラムの2つのインスタンスが実行されていることがわかります(通常は1日目と3日目)。ここで、を押す^Cと、ラッパースクリプトが終了し、1日目と3日目のバイナリプログラムはなくなりますが、5日目と7日目に実行されている新しいバイナリプログラムがあります。

だから私には、最終的に死ぬ前にPool別のプロセスを起動するように見えます。numproc

ここで何が起こっているのか、そして私がそれについて何ができるのか、何かアイデアはありますか?

4

1 に答える 1

12

このページで、マルチプロセッシングモジュールの作成者であるJesse Nollerは、処理する正しい方法KeyboardInterruptは、サブプロセスを返すことであり、例外を再発生させることではないことを示しています。これにより、メインプロセスがプールを終了できるようになります。

ただし、以下のコードが示すように、によって生成されたすべてのタスクが実行されるまで、メインプロセスはブロックに到達ませexcept KeyboardInterruptん。これが、押された後にワーカー関数への余分な呼び出しが表示される理由です(私は信じています) 。pool.maprun_nlinCtrl-C

考えられる回避策の1つは、が設定されているかどうかをすべてのワーカー関数でテストすることmultiprocessing.Eventです。イベントが設定されている場合は、ワーカーを早期に救済してもらいます。それ以外の場合は、長い計算を続行します。


import logging
import multiprocessing as mp
import time

logger = mp.log_to_stderr(logging.WARNING)

def worker(x):
    try:
        if not terminating.is_set():
            logger.warn("Running worker({x!r})".format(x = x))
            time.sleep(3)
        else:
            logger.warn("got the message... we're terminating!")
    except KeyboardInterrupt:
        logger.warn("terminating is set")        
        terminating.set()
    return x

def initializer(terminating_):
    # This places terminating in the global namespace of the worker subprocesses.
    # This allows the worker function to access `terminating` even though it is
    # not passed as an argument to the function.
    global terminating
    terminating = terminating_

def main():
    terminating = mp.Event()    
    result = []
    pool = mp.Pool(initializer=initializer, initargs=(terminating, ))
    params = range(12)
    try:
         logger.warn("starting pool runs")
         result = pool.map(worker, params)
         pool.close()
    except KeyboardInterrupt:
        logger.warn("^C pressed")
        pool.terminate()
    finally:
        pool.join()
        logger.warn('done: {r}'.format(r = result))

if __name__ == '__main__':
    main()

スクリプトを実行すると、次のようになります。

% test.py
[WARNING/MainProcess] starting pool runs
[WARNING/PoolWorker-1] Running worker(0)
[WARNING/PoolWorker-2] Running worker(1)
[WARNING/PoolWorker-3] Running worker(2)
[WARNING/PoolWorker-4] Running worker(3)

ここでCtrl-Cが押されます。各労働者がterminatingイベントを設定します。設定するのに必要なのは1つだけですが、これはわずかな非効率にもかかわらず機能します。

  C-c C-c[WARNING/PoolWorker-4] terminating is set
[WARNING/PoolWorker-2] terminating is set
[WARNING/PoolWorker-3] terminating is set
[WARNING/PoolWorker-1] terminating is set

これで、キューに入れられた他のすべてのタスクpool.mapが実行されます。

[WARNING/PoolWorker-4] got the message... we're terminating!
[WARNING/PoolWorker-2] got the message... we're terminating!
[WARNING/PoolWorker-1] got the message... we're terminating!
[WARNING/PoolWorker-2] got the message... we're terminating!
[WARNING/PoolWorker-4] got the message... we're terminating!
[WARNING/PoolWorker-2] got the message... we're terminating!
[WARNING/PoolWorker-1] got the message... we're terminating!
[WARNING/PoolWorker-3] got the message... we're terminating!

最後に、メインプロセスがexcept KeyboardInterruptブロックに到達します。

[WARNING/MainProcess] ^C pressed
[WARNING/MainProcess] done: []
于 2013-01-29T13:11:33.780 に答える