150

Python のマルチプロセッシング プールで KeyboardInterrupt イベントを処理するにはどうすればよいですか? 以下に簡単な例を示します。

from multiprocessing import Pool
from time import sleep
from sys import exit

def slowly_square(i):
    sleep(1)
    return i*i

def go():
    pool = Pool(8)
    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        # **** THIS PART NEVER EXECUTES. ****
        pool.terminate()
        print "You cancelled the program!"
        sys.exit(1)
    print "\nFinally, here are the results: ", results

if __name__ == "__main__":
    go()

上記のコードを実行すると、KeyboardInterruptを押す^Cと が発生しますが、プロセスはその時点で単にハングするため、外部で強制終了する必要があります。

^Cいつでも押して、すべてのプロセスを正常に終了できるようにしたいと考えています。

4

11 に答える 11

139

これは Python のバグです。threading.Condition.wait() で条件を待機している場合、KeyboardInterrupt は送信されません。再現:

import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"

KeyboardInterrupt 例外は、wait() が戻るまで配信されず、決して返されないため、割り込みは発生しません。KeyboardInterrupt は、ほぼ確実に条件待ちを中断するはずです。

タイムアウトが指定されている場合、これは発生しないことに注意してください。cond.wait(1) はすぐに割り込みを受け取ります。したがって、回避策はタイムアウトを指定することです。そのためには、

    results = pool.map(slowly_square, range(40))

    results = pool.map_async(slowly_square, range(40)).get(9999999)

または類似。

于 2009-09-11T00:45:17.267 に答える
62

私が最近見つけたことから、最善の解決策は、SIGINT を完全に無視するようにワーカー プロセスを設定し、すべてのクリーンアップ コードを親プロセスに限定することです。これにより、アイドル プロセスとビジー ワーカー プロセスの両方の問題が修正され、子プロセスでエラー処理コードが不要になります。

import signal

...

def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

...

def main()
    pool = multiprocessing.Pool(size, init_worker)

    ...

    except KeyboardInterrupt:
        pool.terminate()
        pool.join()

説明と完全なサンプル コードは、それぞれhttp://noswap.com/blog/python-multiprocessing-keyboardinterrupt/http://github.com/jreese/multiprocessing-keyboardinterruptにあります。

于 2011-05-31T18:39:47.170 に答える
31

いくつかの理由で、基本Exceptionクラスから継承された例外のみが正常に処理されます。KeyboardInterrupt回避策として、Exceptionインスタンスとして自分を再レイズすることができます。

from multiprocessing import Pool
import time

class KeyboardInterruptError(Exception): pass

def f(x):
    try:
        time.sleep(x)
        return x
    except KeyboardInterrupt:
        raise KeyboardInterruptError()

def main():
    p = Pool(processes=4)
    try:
        print 'starting the pool map'
        print p.map(f, range(10))
        p.close()
        print 'pool map complete'
    except KeyboardInterrupt:
        print 'got ^C while pool mapping, terminating the pool'
        p.terminate()
        print 'pool is terminated'
    except Exception, e:
        print 'got exception: %r, terminating the pool' % (e,)
        p.terminate()
        print 'pool is terminated'
    finally:
        print 'joining pool processes'
        p.join()
        print 'join complete'
    print 'the end'

if __name__ == '__main__':
    main()

通常、次の出力が得られます。

staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end

したがって、を押す^Cと、次のようになります。

staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end
于 2010-04-01T16:06:35.830 に答える
9

通常、この単純な構造は-on Pool:でCtrl機能します。C

def signal_handle(_signal, frame):
    print "Stopping the Jobs."

signal.signal(signal.SIGINT, signal_handle)

いくつかの同様の投稿で述べられているように:

試行せずにPythonでkeyboardinterruptをキャプチャします-例外

于 2012-10-31T13:44:37.497 に答える
4

当面の最善の解決策は、 multiprocessing.pool 機能を使用せず、独自のプール機能をロールすることであることがわかりました。apply_async のエラーを示す例と、プール機能を完全に使用しないようにする方法を示す例を提供しました。

http://www.bryceboe.com/2010/08/26/python-multiprocessing-and-keyboardinterrupt/

于 2010-08-26T17:16:12.710 に答える
4

次のように、Pool オブジェクトの apply_async メソッドを使用してみてください。

import multiprocessing
import time
from datetime import datetime


def test_func(x):
    time.sleep(2)
    return x**2


def apply_multiprocessing(input_list, input_function):
    pool_size = 5
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=10)

    try:
        jobs = {}
        for value in input_list:
            jobs[value] = pool.apply_async(input_function, [value])

        results = {}
        for value, result in jobs.items():
            try:
                results[value] = result.get()
            except KeyboardInterrupt:
                print "Interrupted by user"
                pool.terminate()
                break
            except Exception as e:
                results[value] = e
        return results
    except Exception:
        raise
    finally:
        pool.close()
        pool.join()


if __name__ == "__main__":
    iterations = range(100)
    t0 = datetime.now()
    results1 = apply_multiprocessing(iterations, test_func)
    t1 = datetime.now()
    print results1
    print "Multi: {}".format(t1 - t0)

    t2 = datetime.now()
    results2 = {i: test_func(i) for i in iterations}
    t3 = datetime.now()
    print results2
    print "Non-multi: {}".format(t3 - t2)

出力:

100
Multiprocessing run time: 0:00:41.131000
100
Non-multiprocessing run time: 0:03:20.688000

この方法の利点は、中断前に処理された結果が結果ディクショナリに返されることです。

>>> apply_multiprocessing(range(100), test_func)
Interrupted by user
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25}
于 2018-08-14T05:30:30.360 に答える
-5

不思議なことKeyboardInterruptに、子供たちにも対処しなければならないようです。私はこれが書かれたように機能することを期待していました...次のように変更slowly_squareしてみてください:

def slowly_square(i):
    try:
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print 'You EVIL bastard!'
        return 0

期待どおりに機能するはずです。

于 2009-09-11T00:26:22.800 に答える