Python で関数を呼び出していますが、これが停止してスクリプトを強制的に再起動する可能性があることがわかっています。
関数を呼び出すにはどうすればよいですか、または関数を何にラップして、5 秒以上かかる場合にスクリプトが関数をキャンセルして別のことを行うようにしますか?
Python で関数を呼び出していますが、これが停止してスクリプトを強制的に再起動する可能性があることがわかっています。
関数を呼び出すにはどうすればよいですか、または関数を何にラップして、5 秒以上かかる場合にスクリプトが関数をキャンセルして別のことを行うようにしますか?
UNIX で実行している場合は、 signalパッケージを使用できます。
In [1]: import signal
# Register an handler for the timeout
In [2]: def handler(signum, frame):
...: print("Forever is over!")
...: raise Exception("end of time")
...:
# This function *may* run for an indetermined time...
In [3]: def loop_forever():
...: import time
...: while 1:
...: print("sec")
...: time.sleep(1)
...:
...:
# Register the signal function handler
In [4]: signal.signal(signal.SIGALRM, handler)
Out[4]: 0
# Define a timeout for your function
In [5]: signal.alarm(10)
Out[5]: 0
In [6]: try:
...: loop_forever()
...: except Exception, exc:
...: print(exc)
....:
sec
sec
sec
sec
sec
sec
sec
sec
Forever is over!
end of time
# Cancel the timer if the function returned before timeout
# (ok, mine won't but yours maybe will :)
In [7]: signal.alarm(0)
Out[7]: 0
call の 10 秒後にsignal.alarm(10)
、ハンドラが呼び出されます。これにより、通常の Python コードからインターセプトできる例外が発生します。
このモジュールはスレッドではうまく機能しません (しかし、誰がそうするでしょうか?)
タイムアウトが発生すると例外が発生するため、関数内でキャッチされて無視される可能性があることに注意してください。たとえば、そのような関数の 1 つです。
def loop_forever():
while 1:
print('sec')
try:
time.sleep(10)
except:
continue
multiprocessing.Process
まさにそれを行うために使用できます。
コード
import multiprocessing
import time
# bar
def bar():
for i in range(100):
print "Tick"
time.sleep(1)
if __name__ == '__main__':
# Start bar as a process
p = multiprocessing.Process(target=bar)
p.start()
# Wait for 10 seconds or until process finishes
p.join(10)
# If thread is still active
if p.is_alive():
print "running... let's kill it..."
# Terminate - may not work if process is stuck for good
p.terminate()
# OR Kill - will work for sure, no chance for process to finish nicely however
# p.kill()
p.join()
関数を呼び出すにはどうすればよいですか、または 5 秒以上かかる場合にスクリプトがキャンセルするようにラップするにはどうすればよいですか?
この質問/問題をデコレータとthreading.Timer
. ここに内訳があります。
Python 2 および 3 でテストされました。Unix/Linux および Windows でも動作するはずです。
まずは輸入物。これらは、Python のバージョンに関係なく、コードの一貫性を維持しようとします。
from __future__ import print_function
import sys
import threading
from time import sleep
try:
import thread
except ImportError:
import _thread as thread
バージョンに依存しないコードを使用:
try:
range, _print = xrange, print
def print(*args, **kwargs):
flush = kwargs.pop('flush', False)
_print(*args, **kwargs)
if flush:
kwargs.get('file', sys.stdout).flush()
except NameError:
pass
これで、標準ライブラリから機能をインポートできました。
exit_after
デコレータmain()
次に、子スレッドから終了する関数が必要です。
def quit_function(fn_name):
# print to stderr, unbuffered in Python 2.
print('{0} took too long'.format(fn_name), file=sys.stderr)
sys.stderr.flush() # Python 3 stderr is likely buffered.
thread.interrupt_main() # raises KeyboardInterrupt
そして、デコレータ自体は次のとおりです。
def exit_after(s):
'''
use as decorator to exit process if
function takes longer than s seconds
'''
def outer(fn):
def inner(*args, **kwargs):
timer = threading.Timer(s, quit_function, args=[fn.__name__])
timer.start()
try:
result = fn(*args, **kwargs)
finally:
timer.cancel()
return result
return inner
return outer
5 秒後に終了することについての質問に直接答える使用法を次に示します。
@exit_after(5)
def countdown(n):
print('countdown started', flush=True)
for i in range(n, -1, -1):
print(i, end=', ', flush=True)
sleep(1)
print('countdown finished')
デモ:
>>> countdown(3)
countdown started
3, 2, 1, 0, countdown finished
>>> countdown(10)
countdown started
10, 9, 8, 7, 6, countdown took too long
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 11, in inner
File "<stdin>", line 6, in countdown
KeyboardInterrupt
2 番目の関数呼び出しは終了しません。代わりに、プロセスはトレースバックで終了する必要があります。
KeyboardInterrupt
スリープ中のスレッドを常に停止するとは限りませんWindows 上の Python 2 では、キーボード割り込みによってスリープが常に中断されるとは限らないことに注意してください。
@exit_after(1)
def sleep10():
sleep(10)
print('slept 10 seconds')
>>> sleep10()
sleep10 took too long # Note that it hangs here about 9 more seconds
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 11, in inner
File "<stdin>", line 3, in sleep10
KeyboardInterrupt
Cython 、Python、および KeyboardInterrupt を無視PyErr_CheckSignals()
するを参照 してください。
いずれにせよ、スレッドを 1 秒以上スリープさせないようにします。これは、プロセッサ時間の 1 イオンです。
関数を呼び出すにはどうすればよいですか、または関数を何にラップして、5 秒以上かかる場合にスクリプトが関数をキャンセルして別のことを行うようにしますか?
それをキャッチして別のことを行うには、KeyboardInterrupt をキャッチできます。
>>> try:
... countdown(10)
... except KeyboardInterrupt:
... print('do something else')
...
countdown started
10, 9, 8, 7, 6, countdown took too long
do something else
純粋な関数 (スレッドの提案と同じ API を使用) であり、正常に動作するように見える別の提案があります (このスレッドの提案に基づく)
def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
import signal
class TimeoutError(Exception):
pass
def handler(signum, frame):
raise TimeoutError()
# set the timeout handler
signal.signal(signal.SIGALRM, handler)
signal.alarm(timeout_duration)
try:
result = func(*args, **kwargs)
except TimeoutError as exc:
result = default
finally:
signal.alarm(0)
return result
timeout-decorator
Windows システムでは動作しません。Windows はうまくサポートされていませんでしsignal
た。
Windows システムで timeout-decorator を使用すると、次のようになります。
AttributeError: module 'signal' has no attribute 'SIGALRM'
使用を提案した人use_signals=False
もいましたが、私にはうまくいきませんでした。
著者 @bitranox は、次のパッケージを作成しました。
pip install https://github.com/bitranox/wrapt-timeout-decorator/archive/master.zip
コードサンプル:
import time
from wrapt_timeout_decorator import *
@timeout(5)
def mytest(message):
print(message)
for i in range(1,10):
time.sleep(1)
print('{} seconds have passed'.format(i))
def main():
mytest('starting')
if __name__ == '__main__':
main()
次の例外があります。
TimeoutError: Function mytest timed out after 5 seconds
同じために信号を使用できます。以下の例が参考になると思います。スレッドに比べて非常にシンプルです。
import signal
def timeout(signum, frame):
raise myException
#this is an infinite loop, never ending under normal circumstances
def main():
print 'Starting Main ',
while 1:
print 'in main ',
#SIGALRM is only usable on a unix platform
signal.signal(signal.SIGALRM, timeout)
#change 5 to however many seconds you need
signal.alarm(5)
try:
main()
except myException:
print "whoops"
#!/usr/bin/python2
import sys, subprocess, threading
proc = subprocess.Popen(sys.argv[2:])
timer = threading.Timer(float(sys.argv[1]), proc.terminate)
timer.start()
proc.wait()
timer.cancel()
exit(proc.returncode)
私は同じ問題に直面しましたが、私の状況はサブスレッドで作業する必要があり、シグナルが機能しなかったため、python パッケージを作成しました: この問題を解決するタイムアウトタイマー、コンテキストまたはデコレーターとしての使用のサポート、シグナルの使用またはタイムアウト割り込みをトリガーするサブスレッドモジュール:
from timeout_timer import timeout, TimeoutInterrupt
class TimeoutInterruptNested(TimeoutInterrupt):
pass
def test_timeout_nested_loop_both_timeout(timer="thread"):
cnt = 0
try:
with timeout(5, timer=timer):
try:
with timeout(2, timer=timer, exception=TimeoutInterruptNested):
sleep(2)
except TimeoutInterruptNested:
cnt += 1
time.sleep(10)
except TimeoutInterrupt:
cnt += 1
assert cnt == 2
これは、指定されたスレッドベースのソリューションをわずかに改善したものです。
以下のコードは例外をサポートしています:
def runFunctionCatchExceptions(func, *args, **kwargs):
try:
result = func(*args, **kwargs)
except Exception, message:
return ["exception", message]
return ["RESULT", result]
def runFunctionWithTimeout(func, args=(), kwargs={}, timeout_duration=10, default=None):
import threading
class InterruptableThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.result = default
def run(self):
self.result = runFunctionCatchExceptions(func, *args, **kwargs)
it = InterruptableThread()
it.start()
it.join(timeout_duration)
if it.isAlive():
return default
if it.result[0] == "exception":
raise it.result[1]
return it.result[1]
5 秒のタイムアウトで呼び出す:
result = timeout(remote_calculate, (myarg,), timeout_duration=5)
これは、以前の回答の多くを組み合わせて次の機能を提供する POSIX バージョンです。
コードといくつかのテストケースは次のとおりです。
import threading
import signal
import os
import time
class TerminateExecution(Exception):
"""
Exception to indicate that execution has exceeded the preset running time.
"""
def quit_function(pid):
# Killing all subprocesses
os.setpgrp()
os.killpg(0, signal.SIGTERM)
# Killing the main thread
os.kill(pid, signal.SIGTERM)
def handle_term(signum, frame):
raise TerminateExecution()
def invoke_with_timeout(timeout, fn, *args, **kwargs):
# Setting a sigterm handler and initiating a timer
old_handler = signal.signal(signal.SIGTERM, handle_term)
timer = threading.Timer(timeout, quit_function, args=[os.getpid()])
terminate = False
# Executing the function
timer.start()
try:
result = fn(*args, **kwargs)
except TerminateExecution:
terminate = True
finally:
# Restoring original handler and cancel timer
signal.signal(signal.SIGTERM, old_handler)
timer.cancel()
if terminate:
raise BaseException("xxx")
return result
### Test cases
def countdown(n):
print('countdown started', flush=True)
for i in range(n, -1, -1):
print(i, end=', ', flush=True)
time.sleep(1)
print('countdown finished')
return 1337
def really_long_function():
time.sleep(10)
def really_long_function2():
os.system("sleep 787")
# Checking that we can run a function as expected.
assert invoke_with_timeout(3, countdown, 1) == 1337
# Testing various scenarios
t1 = time.time()
try:
print(invoke_with_timeout(1, countdown, 3))
assert(False)
except BaseException:
assert(time.time() - t1 < 1.1)
print("All good", time.time() - t1)
t1 = time.time()
try:
print(invoke_with_timeout(1, really_long_function2))
assert(False)
except BaseException:
assert(time.time() - t1 < 1.1)
print("All good", time.time() - t1)
t1 = time.time()
try:
print(invoke_with_timeout(1, really_long_function))
assert(False)
except BaseException:
assert(time.time() - t1 < 1.1)
print("All good", time.time() - t1)
# Checking that classes are referenced and not
# copied (as would be the case with multiprocessing)
class X:
def __init__(self):
self.value = 0
def set(self, v):
self.value = v
x = X()
invoke_with_timeout(2, x.set, 9)
assert x.value == 9