タイムアウトを超えた後に関数の実行を終了させたい場合は、その機能を持つコードを試してください。モジュールを使用するには、関数を の引数として呼び出すだけadd_timeout
で、戻り値を実行できます。呼び出されると、オブジェクトのready
プロパティをポーリングでき、返されたものはすべてvalue
プロパティを介してアクセスできます。コードのドキュメントには、利用可能な残りの API についての説明が記載されている必要があります。
[長さのために編集されたコード]ソースについては、 timeout.pyを参照してください。
補遺:
問題を見る別の方法があります。有効期限を持つように関数をデコレートする代わりに、関数とその引数を実行エンジンに送信して、実行できる時間にタイムアウトを強制することができます。上記の最初の回答は、約 8 年前にこの問題を解決したものでした。この件についてさらに調査した後、代わりに次のモジュールをお勧めできますか? それに続くモジュールは、モジュールの使用法をテストおよびデモンストレーションするために使用されasynchronous
ます。
非同期.py
#! /usr/bin/env python3
import abc as _abc
import collections as _collections
import enum as _enum
import math as _math
import multiprocessing as _multiprocessing
import operator as _operator
import queue as _queue
import signal as _signal
import sys as _sys
import time as _time
__all__ = (
'Executor',
'get_timeout',
'set_timeout',
'submit',
'map_',
'shutdown'
)
class _Base(metaclass=_abc.ABCMeta):
__slots__ = (
'__timeout',
)
@_abc.abstractmethod
def __init__(self, timeout):
self.timeout = _math.inf if timeout is None else timeout
def get_timeout(self):
return self.__timeout
def set_timeout(self, value):
if not isinstance(value, (float, int)):
raise TypeError('value must be of type float or int')
if value <= 0:
raise ValueError('value must be greater than zero')
self.__timeout = value
timeout = property(get_timeout, set_timeout)
def _run_and_catch(fn, args, kwargs):
# noinspection PyPep8,PyBroadException
try:
return False, fn(*args, **kwargs)
except:
return True, _sys.exc_info()[1]
def _run(fn, args, kwargs, queue):
queue.put_nowait(_run_and_catch(fn, args, kwargs))
class _State(_enum.IntEnum):
PENDING = _enum.auto()
RUNNING = _enum.auto()
CANCELLED = _enum.auto()
FINISHED = _enum.auto()
ERROR = _enum.auto()
def _run_and_catch_loop(iterable, *args, **kwargs):
exception = None
for fn in iterable:
error, value = _run_and_catch(fn, args, kwargs)
if error:
exception = value
if exception:
raise exception
class _Future(_Base):
__slots__ = (
'__queue',
'__process',
'__start_time',
'__callbacks',
'__result'
)
def __init__(self, timeout, fn, args, kwargs):
super().__init__(timeout)
self.__queue = _multiprocessing.Queue(1)
self.__process = _multiprocessing.Process(
target=_run,
args=(fn, args, kwargs, self.__queue),
daemon=True
)
self.__start_time = _math.inf
self.__callbacks = _collections.deque()
self.__result = True, TimeoutError()
@property
def __state(self):
pid, exitcode = self.__process.pid, self.__process.exitcode
return (_State.PENDING if pid is None else
_State.RUNNING if exitcode is None else
_State.CANCELLED if exitcode == -_signal.SIGTERM else
_State.FINISHED if exitcode == 0 else
_State.ERROR)
def __repr__(self):
root = f'{type(self).__name__} at {id(self)} state={self.__state.name}'
if self.__state < _State.CANCELLED:
return f'<{root}>'
error, value = self.__result
suffix = f'{"raised" if error else "returned"} {type(value).__name__}'
return f'<{root} {suffix}>'
def __consume_callbacks(self):
while self.__callbacks:
yield self.__callbacks.popleft()
def __invoke_callbacks(self):
self.__process.join()
_run_and_catch_loop(self.__consume_callbacks(), self)
def cancel(self):
self.__process.terminate()
self.__invoke_callbacks()
def __auto_cancel(self):
elapsed_time = _time.perf_counter() - self.__start_time
if elapsed_time > self.timeout:
self.cancel()
return elapsed_time
def cancelled(self):
self.__auto_cancel()
return self.__state is _State.CANCELLED
def running(self):
self.__auto_cancel()
return self.__state is _State.RUNNING
def done(self):
self.__auto_cancel()
return self.__state > _State.RUNNING
def __handle_result(self, error, value):
self.__result = error, value
self.__invoke_callbacks()
def __ensure_termination(self):
elapsed_time = self.__auto_cancel()
if not self.__queue.empty():
self.__handle_result(*self.__queue.get_nowait())
elif self.__state < _State.CANCELLED:
remaining_time = self.timeout - elapsed_time
if remaining_time == _math.inf:
remaining_time = None
try:
result = self.__queue.get(True, remaining_time)
except _queue.Empty:
self.cancel()
else:
self.__handle_result(*result)
def result(self):
self.__ensure_termination()
error, value = self.__result
if error:
raise value
return value
def exception(self):
self.__ensure_termination()
error, value = self.__result
if error:
return value
def add_done_callback(self, fn):
if self.done():
fn(self)
else:
self.__callbacks.append(fn)
def _set_running_or_notify_cancel(self):
if self.__state is _State.PENDING:
self.__process.start()
self.__start_time = _time.perf_counter()
else:
self.cancel()
class Executor(_Base):
__slots__ = (
'__futures',
)
def __init__(self, timeout=None):
super().__init__(timeout)
self.__futures = set()
def submit(self, fn, *args, **kwargs):
future = _Future(self.timeout, fn, args, kwargs)
self.__futures.add(future)
future.add_done_callback(self.__futures.remove)
# noinspection PyProtectedMember
future._set_running_or_notify_cancel()
return future
@staticmethod
def __cancel_futures(iterable):
_run_and_catch_loop(map(_operator.attrgetter('cancel'), iterable))
def map(self, fn, *iterables):
futures = tuple(self.submit(fn, *args) for args in zip(*iterables))
def result_iterator():
future_iterator = iter(futures)
try:
for future in future_iterator:
yield future.result()
finally:
self.__cancel_futures(future_iterator)
return result_iterator()
def shutdown(self):
self.__cancel_futures(frozenset(self.__futures))
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown()
return False
_executor = Executor()
get_timeout = _executor.get_timeout
set_timeout = _executor.set_timeout
submit = _executor.submit
map_ = _executor.map
shutdown = _executor.shutdown
del _executor
test_asynchronous.py
#! /usr/bin/env python3
import _thread
import atexit
import functools
import inspect
import io
import math
import operator
import os
import sys
import time
import unittest
import asynchronous
# noinspection PyUnresolvedReferences
class TestConstructor:
def instantiate(self, *args):
parameters = len(inspect.signature(self.CLASS).parameters)
return self.CLASS(*args[:parameters])
def test_valid_timeout(self):
instance = self.instantiate(None, print, (), {})
self.assertEqual(instance.get_timeout(), math.inf)
instance = self.instantiate(1, print, (), {})
self.assertEqual(instance.get_timeout(), 1)
float_timeout = (math.e ** (1j * math.pi) + 1).imag
self.assertIsInstance(float_timeout, float)
instance = self.instantiate(float_timeout, print, (), {})
self.assertEqual(instance.get_timeout(), float_timeout)
def test_error_timeout(self):
self.assertRaises(TypeError, self.instantiate, '60', print, (), {})
self.assertRaises(ValueError, self.instantiate, 0, print, (), {})
self.assertRaises(ValueError, self.instantiate, -1, print, (), {})
# noinspection PyUnresolvedReferences
class TestTimeout(TestConstructor):
def test_valid_property(self):
instance = self.instantiate(None, None, None, None)
instance.timeout = 1
self.assertIsInstance(instance.timeout, int)
instance.timeout = 1 / 2
self.assertIsInstance(instance.timeout, float)
kilo_bit = int.from_bytes(os.urandom(1 << 7), 'big')
instance.timeout = kilo_bit
self.assertEqual(instance.timeout, kilo_bit)
def test_error_property(self):
instance = self.instantiate(None, None, None, None)
for exception, value in (
(TypeError, 'inf'),
(TypeError, complex(123456789, 0)),
(ValueError, 0),
(ValueError, 0.0),
(ValueError, -1),
(ValueError, -math.pi)
):
with self.assertRaises(exception):
instance.timeout = value
self.assertEqual(instance.timeout, math.inf)
class Timer:
__timers = {}
@classmethod
def start_timer(cls):
ident, now = _thread.get_ident(), time.perf_counter()
if now is not cls.__timers.setdefault(ident, now):
raise KeyError(ident)
@classmethod
def stop_timer(cls, expected_time, error=None):
if error is None:
error = 1 / 4 # the default is a quarter second
used = time.perf_counter() - cls.__timers.pop(_thread.get_ident())
diff = used - expected_time
return -error <= diff <= +error
# noinspection PyUnresolvedReferences
class TestTimer(Timer):
def stop_timer(self, expected_time, error=None):
self.assertTrue(super().stop_timer(expected_time, error))
def delay_run(delay, fn, *args, sync=True, **kwargs):
def wrapper():
time.sleep(delay)
return fn(*args, **kwargs)
if sync:
return wrapper()
_thread.start_new_thread(wrapper, ())
# noinspection PyUnresolvedReferences
class TestModuleOrInstance(TestTimer):
@property
def moi(self):
return self.MODULE_OR_INSTANCE
def test_valid_timeout(self):
self.moi.set_timeout(math.inf)
self.assertEqual(self.moi.get_timeout(), math.inf)
self.moi.set_timeout(60)
self.assertEqual(self.moi.get_timeout(), 60)
self.moi.set_timeout(0.05)
self.assertEqual(self.moi.get_timeout(), 0.05)
def test_error_timeout(self):
self.moi.set_timeout(math.inf)
self.assertRaises(TypeError, self.moi.set_timeout, None)
self.assertEqual(self.moi.get_timeout(), math.inf)
self.assertRaises(ValueError, self.moi.set_timeout, 0)
self.assertEqual(self.moi.get_timeout(), math.inf)
self.assertRaises(ValueError, self.moi.set_timeout, -1)
self.assertEqual(self.moi.get_timeout(), math.inf)
def run_submit_check(self):
self.start_timer()
future = self.moi.submit(delay_run, 0.5, operator.add, 1, 2)
self.assertRegex(repr(future), r'^<_Future at \d+ state=RUNNING>$')
self.assertEqual(future.result(), 3)
self.stop_timer(0.5)
self.assertRegex(
repr(future),
r'^<_Future at \d+ state=FINISHED returned int>$'
)
def test_submit_one_second_timeout(self):
self.moi.set_timeout(1)
self.run_submit_check()
def test_submit_no_timeout(self):
self.moi.set_timeout(math.inf)
self.run_submit_check()
def test_submit_short_timeout(self):
self.moi.set_timeout(0.5)
self.start_timer()
future = self.moi.submit(delay_run, 1, operator.add, 1, 2)
self.assertRegex(repr(future), r'^<_Future at \d+ state=RUNNING>$')
self.assertIsInstance(future.exception(), TimeoutError)
self.stop_timer(0.5)
self.assertRegex(
repr(future),
r'^<_Future at \d+ state=CANCELLED raised TimeoutError>$'
)
def run_map(self, *args):
return getattr(self.moi, self.NAME_OF_MAP)(delay_run, *args)
def test_valid_map(self):
self.moi.set_timeout(1.5)
for result in self.run_map(
[1, 1, 1, 1],
[operator.add] * 4,
[0, 1, 2, 3],
[3, 2, 1, 0]
):
self.assertEqual(result, 3)
def test_error_map(self):
self.moi.set_timeout(1.5)
success = 0
with self.assertRaises(TimeoutError):
for result in self.run_map(
[1, 1, 2, 1],
[operator.add] * 4,
[0, 1, 2, 3],
[3, 2, 1, 0]
):
self.assertEqual(result, 3)
success += 1
self.assertEqual(success, 2)
def run_shutdown_check(self, running, future):
self.assertRaises(TimeoutError, future.result)
running.remove(future)
def run_submit_loop(self, executor):
running = set()
done_callback = functools.partial(self.run_shutdown_check, running)
for _ in range(10):
future = executor.submit(delay_run, 2, operator.add, 10, 20)
running.add(future)
future.add_done_callback(done_callback)
time.sleep(0.5)
return running
def test_valid_shutdown(self):
self.moi.set_timeout(1.5)
running = self.run_submit_loop(self.moi)
self.moi.shutdown()
self.assertFalse(running)
def test_error_shutdown(self):
self.moi.set_timeout(1.5)
running = self.run_submit_loop(self.moi)
running.pop()
self.assertRaises(KeyError, self.moi.shutdown)
self.assertFalse(running)
class TestExecutorAPI(TestTimeout, TestModuleOrInstance, unittest.TestCase):
CLASS = asynchronous.Executor
MODULE_OR_INSTANCE = CLASS()
NAME_OF_MAP = 'map'
def test_valid_context_manager(self):
with self.instantiate(1.5) as executor:
running = self.run_submit_loop(executor)
self.assertFalse(running)
def test_error_context_manager(self):
error = Exception()
with self.assertRaises(Exception) as cm:
with self.instantiate(1.5) as executor:
running = self.run_submit_loop(executor)
raise error
self.assertIs(cm.exception, error)
self.assertFalse(running)
with self.assertRaises(KeyError):
with self.instantiate(1.5) as executor:
running = self.run_submit_loop(executor)
running.pop()
self.assertFalse(running)
class TestModuleAPI(TestModuleOrInstance, unittest.TestCase):
MODULE_OR_INSTANCE = asynchronous
NAME_OF_MAP = 'map_'
def verify_error():
sys.stderr.seek(0, io.SEEK_SET)
for line in sys.stderr:
if line == 'queue.Full\n':
break
else:
sys.stderr.seek(0, io.SEEK_SET)
sys.__stderr__.write(sys.stderr.read())
sys.__stderr__.flush()
def cause_error(obj):
sys.stderr = io.StringIO()
atexit.register(verify_error)
inspect.currentframe().f_back.f_back.f_locals['queue'].put_nowait(obj)
def return_(obj):
return obj
# noinspection PyUnusedLocal
def throw(exception, *args):
raise exception
class TestFutureAPI(TestTimer, TestTimeout, unittest.TestCase):
CLASS = asynchronous._Future
def test_valid_representation(self):
future = self.instantiate(None, time.sleep, (0.1,), {})
self.assertRegex(repr(future), r'^<_Future at \d+ state=PENDING>$')
future._set_running_or_notify_cancel()
self.assertRegex(repr(future), r'^<_Future at \d+ state=RUNNING>$')
future._set_running_or_notify_cancel()
self.assertRegex(
repr(future),
r'^<_Future at \d+ state=CANCELLED raised TimeoutError>$'
)
future = self.instantiate(None, time.sleep, (0.1,), {})
future._set_running_or_notify_cancel()
time.sleep(0.5)
self.assertRegex(
repr(future),
r'^<_Future at \d+ state=FINISHED raised TimeoutError>$'
)
self.assertIs(future.exception(), None)
self.assertRegex(
repr(future),
r'^<_Future at \d+ state=FINISHED returned NoneType>$'
)
def test_error_representation(self):
future = self.instantiate(0.5, cause_error, (None,), {})
future._set_running_or_notify_cancel()
self.assertRaises(TypeError, future.result)
self.assertIsInstance(future.exception(), TimeoutError)
self.assertRegex(
repr(future),
r'^<_Future at \d+ state=ERROR raised TimeoutError>$'
)
future = self.instantiate(0.5, cause_error, ((False, 'okay'),), {})
future._set_running_or_notify_cancel()
self.assertEqual(future.result(), 'okay')
self.assertRegex(
repr(future),
r'^<_Future at \d+ state=ERROR returned str>$'
)
def test_cancel(self):
future = self.instantiate(None, time.sleep, (0.1,), {})
self.assertRaises(AttributeError, future.cancel)
future._set_running_or_notify_cancel()
future.cancel()
self.assertTrue(future.cancelled())
future = self.instantiate(None, time.sleep, (0.1,), {})
checker = set()
future.add_done_callback(checker.add)
future._set_running_or_notify_cancel()
future.cancel()
future.cancel()
self.assertIs(checker.pop(), future)
self.assertFalse(checker)
def test_cancelled(self):
future = self.instantiate(None, time.sleep, (0.1,), {})
self.assertFalse(future.cancelled())
future._set_running_or_notify_cancel()
self.assertFalse(future.cancelled())
self.assertIs(future.result(), None)
self.assertFalse(future.cancelled())
future = self.instantiate(None, time.sleep, (0.1,), {})
future._set_running_or_notify_cancel()
future.cancel()
self.assertTrue(future.cancelled())
future = self.instantiate(0.1, time.sleep, (1,), {})
future._set_running_or_notify_cancel()
time.sleep(0.5)
self.assertTrue(future.cancelled())
def test_running(self):
future = self.instantiate(None, time.sleep, (0.1,), {})
self.assertFalse(future.running())
future._set_running_or_notify_cancel()
self.assertTrue(future.running())
self.assertIs(future.result(), None)
self.assertFalse(future.running())
future = self.instantiate(None, time.sleep, (0.1,), {})
future._set_running_or_notify_cancel()
future.cancel()
self.assertFalse(future.running())
future = self.instantiate(0.1, time.sleep, (1,), {})
future._set_running_or_notify_cancel()
time.sleep(0.5)
self.assertFalse(future.running())
def test_done(self):
future = self.instantiate(None, time.sleep, (0.1,), {})
self.assertFalse(future.done())
future._set_running_or_notify_cancel()
self.assertFalse(future.done())
self.assertIs(future.result(), None)
self.assertTrue(future.done())
future = self.instantiate(None, time.sleep, (None,), {})
future._set_running_or_notify_cancel()
self.assertIsInstance(future.exception(), TypeError)
self.assertTrue(future.done())
def test_result_immediate(self):
data = os.urandom(1 << 20)
future = self.instantiate(None, return_, (data,), {})
future._set_running_or_notify_cancel()
self.assertEqual(future.result(), data)
test_exception = Exception('test')
future = self.instantiate(None, throw, (test_exception,), {})
future._set_running_or_notify_cancel()
with self.assertRaises(Exception) as cm:
future.result()
self.assertIsInstance(cm.exception, type(test_exception))
self.assertEqual(cm.exception.args, test_exception.args)
def test_result_delay(self):
future = self.instantiate(None, delay_run, (0, operator.add, 1, 2), {})
self.start_timer()
future._set_running_or_notify_cancel()
self.assertEqual(future.result(), 3)
self.stop_timer(0.1)
future = self.instantiate(None, delay_run, (1, operator.add, 2, 3), {})
self.start_timer()
future._set_running_or_notify_cancel()
self.assertEqual(future.result(), 5)
self.stop_timer(1)
future = self.instantiate(0.5, delay_run, (0, operator.add, 1, 2), {})
self.start_timer()
future._set_running_or_notify_cancel()
self.assertEqual(future.result(), 3)
self.stop_timer(0.1)
future = self.instantiate(0.5, delay_run, (1, operator.add, 2, 3), {})
self.start_timer()
future._set_running_or_notify_cancel()
self.assertRaises(TimeoutError, future.result)
self.stop_timer(0.5)
def test_result_before_running(self):
future = self.instantiate(0.1, delay_run, (0, operator.add, 1, 2), {})
delay_run(0.5, future._set_running_or_notify_cancel, sync=False)
self.start_timer()
self.assertEqual(future.result(), 3)
self.stop_timer(0.5)
def test_exception_immediate(self):
data = os.urandom(1 << 20)
future = self.instantiate(None, return_, (data,), {})
future._set_running_or_notify_cancel()
self.assertIs(future.exception(), None)
test_exception = Exception('test')
future = self.instantiate(None, throw, (test_exception,), {})
future._set_running_or_notify_cancel()
self.assertIsInstance(future.exception(), type(test_exception))
self.assertEqual(future.exception().args, test_exception.args)
def test_exception_delay(self):
future = self.instantiate(None, delay_run, (0, operator.add, 1, 2), {})
self.start_timer()
future._set_running_or_notify_cancel()
self.assertIs(future.exception(), None)
self.stop_timer(0.1)
future = self.instantiate(None, delay_run, (1, operator.add, 2, 3), {})
self.start_timer()
future._set_running_or_notify_cancel()
self.assertIs(future.exception(), None)
self.stop_timer(1)
future = self.instantiate(0.5, delay_run, (0, operator.add, 1, 2), {})
self.start_timer()
future._set_running_or_notify_cancel()
self.assertIs(future.exception(), None)
self.stop_timer(0.1)
future = self.instantiate(0.5, delay_run, (1, operator.add, 2, 3), {})
self.start_timer()
future._set_running_or_notify_cancel()
self.assertIsInstance(future.exception(), TimeoutError)
self.assertFalse(future.exception().args)
self.stop_timer(0.5)
def test_exception_before_running(self):
future = self.instantiate(0.1, delay_run, (0, operator.add, 1, 2), {})
delay_run(0.5, future._set_running_or_notify_cancel, sync=False)
self.start_timer()
self.assertIs(future.exception(), None)
self.stop_timer(0.5)
def test_valid_add_done_callback(self):
future = self.instantiate(None, time.sleep, (0,), {})
requires_callback = {future}
future.add_done_callback(requires_callback.remove)
self.assertIn(future, requires_callback)
future._set_running_or_notify_cancel()
self.assertIs(future.exception(), None)
self.assertFalse(requires_callback)
requires_callback.add(future)
future.add_done_callback(requires_callback.remove)
self.assertFalse(requires_callback)
def test_error_add_done_callback(self):
future = self.instantiate(None, time.sleep, (0,), {})
requires_callback = [{future} for _ in range(10)]
callbacks = [s.remove for s in requires_callback]
error = Exception()
callbacks.insert(5, functools.partial(throw, error))
for fn in callbacks:
future.add_done_callback(fn)
future._set_running_or_notify_cancel()
with self.assertRaises(Exception) as cm:
future.exception()
self.assertIs(cm.exception, error)
self.assertFalse(any(requires_callback))
def test_set_running_or_notify_cancel(self):
future = self.instantiate(None, time.sleep, (0.1,), {})
self.assertFalse(future.running() or future.done())
future._set_running_or_notify_cancel()
self.assertTrue(future.running())
future._set_running_or_notify_cancel()
self.assertTrue(future.cancelled())
if __name__ == '__main__':
unittest.main()