5

前もって申し訳ありませんが、これは長くなります...

おそらく関連しています:

Python マルチプロセッシング atexit エラー「atexit._run_exitfuncs のエラー」

間違いなく関連:

グローバルデータを使用したpythonパラレルマップ(multiprocessing.Pool.map)

Python のマルチプロセッシング プールでのキーボード割り込み

これは、私の問題を説明するために一緒にハッキングした「単純な」スクリプトです...

import time
import multiprocessing as multi
import atexit

cleanup_stuff=multi.Manager().list([])

##################################################
# Some code to allow keyboard interrupts  
##################################################
was_interrupted=multi.Manager().list([])
class _interrupt(object):
    """
    Toy class to allow retrieval of the interrupt that triggered it's execution
    """
    def __init__(self,interrupt):
        self.interrupt=interrupt

def interrupt():
    was_interrupted.append(1)

def interruptable(func):
    """
    decorator to allow functions to be "interruptable" by
    a keyboard interrupt when in python's multiprocessing.Pool.map
    **Note**, this won't actually cause the Map to be interrupted,
    It will merely cause the following functions to be not executed.
    """
    def newfunc(*args,**kwargs):
        try:
            if(not was_interrupted):
                return func(*args,**kwargs)
            else:
                return False
        except KeyboardInterrupt as e:
            interrupt()
            return _interrupt(e)  #If we really want to know about the interrupt...
    return newfunc

@atexit.register
def cleanup():
    for i in cleanup_stuff:
        print(i)
    return

@interruptable
def func(i):
    print(i)
    cleanup_stuff.append(i)
    time.sleep(float(i)/10.)
    return i

#Must wrap func here, otherwise it won't be found in __main__'s dict
#Maybe because it was created dynamically using the decorator?
def wrapper(*args):
    return func(*args)


if __name__ == "__main__":

    #This is an attempt to use signals -- I also attempted something similar where
    #The signals were only caught in the child processes...Or only on the main process...
    #
    #import signal
    #def onSigInt(*args): interrupt()
    #signal.signal(signal.SIGINT,onSigInt)

    #Try 2 with signals (only catch signal on main process)
    #import signal
    #def onSigInt(*args): interrupt()
    #signal.signal(signal.SIGINT,onSigInt)
    #def startup(): signal.signal(signal.SIGINT,signal.SIG_IGN)
    #p=multi.Pool(processes=4,initializer=startup)

    #Try 3 with signals (only catch signal on child processes)
    #import signal
    #def onSigInt(*args): interrupt()
    #signal.signal(signal.SIGINT,signal.SIG_IGN)
    #def startup(): signal.signal(signal.SIGINT,onSigInt)
    #p=multi.Pool(processes=4,initializer=startup)


    p=multi.Pool(4)
    try:
        out=p.map(wrapper,range(30))
        #out=p.map_async(wrapper,range(30)).get()  #This doesn't work either...

        #The following lines don't work either
        #Effectively trying to roll my own p.map() with p.apply_async 
        # results=[p.apply_async(wrapper,args=(i,)) for i in range(30)]
        # out = [ r.get() for r in results() ]
    except KeyboardInterrupt:
        print ("Hello!")
        out=None
    finally:
        p.terminate()
        p.join()

    print (out)

これは、KeyboardInterrupt が発生しない場合に問題なく機能します。ただし、1 つ上げると、次の例外が発生します。

10
7
9
12
^CHello!
None
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/usr/lib/python2.6/atexit.py", line 24, in _run_exitfuncs
    func(*targs, **kargs)
  File "test.py", line 58, in cleanup
    for i in cleanup_stuff:
  File "<string>", line 2, in __getitem__
  File "/usr/lib/python2.6/multiprocessing/managers.py", line 722, in _callmethod
    self._connect()
  File "/usr/lib/python2.6/multiprocessing/managers.py", line 709, in _connect
    conn = self._Client(self._token.address, authkey=self._authkey)
  File "/usr/lib/python2.6/multiprocessing/connection.py", line 143, in Client
   c = SocketClient(address)
  File "/usr/lib/python2.6/multiprocessing/connection.py", line 263, in SocketClient
   s.connect(address)
  File "<string>", line 1, in connect
error: [Errno 2] No such file or directory
Error in sys.exitfunc:
Traceback (most recent call last):
  File "/usr/lib/python2.6/atexit.py", line 24, in _run_exitfuncs
    func(*targs, **kargs)
  File "test.py", line 58, in cleanup
    for i in cleanup_stuff:
  File "<string>", line 2, in __getitem__
  File "/usr/lib/python2.6/multiprocessing/managers.py", line 722, in _callmethod
    self._connect()
  File "/usr/lib/python2.6/multiprocessing/managers.py", line 709, in _connect
    conn = self._Client(self._token.address, authkey=self._authkey)
  File "/usr/lib/python2.6/multiprocessing/connection.py", line 143, in Client
    c = SocketClient(address)
  File "/usr/lib/python2.6/multiprocessing/connection.py", line 263, in SocketClient
    s.connect(address)
  File "<string>", line 1, in connect
socket.error: [Errno 2] No such file or directory

興味深いことに、コードは追加の関数を呼び出さずに Pool.map 関数を終了します...問題は、ある時点で KeyboardInterrupt が適切に処理されていないように思われますが、それがどこにあるのか少し混乱しています。なぜ割り込み可能で処理されないのですか。ありがとう。

使用すると同じ問題が発生することに注意してください out=p.map_async(wrapper,range(30)).get()

編集1

out=p.map(...)もう少し近づいて...を句で囲むtry,except,finallyと、最初の例外が取り除かれます...ただし、他の例外は引き続き atexit で発生します。上記のコードとトレースバックが更新されました。

編集2

上記のコードには、コメントとして機能しないものが追加されています。(同じエラー)。この試みは、次のことに触発されました。

http://jessenoller.com/2009/01/08/multiprocessingpool-and-keyboardinterrupt/

編集3

上記のコードに追加されたシグナルを使用して別の試みが失敗しました。

編集4

上記が不要になるようにコードを再構築する方法を見つけました。(ありそうもない)誰かが私と同じユースケースでこのスレッドに出くわした場合、私は私の解決策を説明します...

使用事例

モジュールを使用して一時ファイルを生成する関数がありtempfileます。プログラムの終了時にこれらの一時ファイルをクリーンアップしたいと思います。私の最初の試みは、各一時ファイル名をリストにパックしてから、 を介して登録された関数を使用してリストのすべての要素を削除することでしたatexit.register。問題は、更新されたリストが複数のプロセスにわたって更新されていなかったことです。multiprocessing.Managerここで、リスト データを管理するために を使用するというアイデアを思いつきました。残念ながら、KeyboardInterrupt何らかの理由でプロセス間の通信ソケットが壊れているため、これはどれだけ頑張っても失敗します。この問題の解決策は簡単です。マルチプロセッシングを使用する前に、一時ファイル ディレクトリを設定します。tempfile.tempdir=tempfile.mkdtemp()次に、一時ディレクトリを削除する関数を登録します。各プロセスは同じ一時ディレクトリに書き込むため、機能します。 もちろん、このソリューションは、共有データが、プログラムの寿命の終わりに削除する必要があるファイルのリストである場合にのみ機能します。

4

0 に答える 0