フラグ/セマフォなどを設定/チェックせずに実行中のスレッドを終了することは可能ですか?
29 に答える
Python やどの言語でも、スレッドを突然強制終了するのは一般的に悪いパターンです。次のケースを考えてください。
- スレッドは、適切に閉じる必要がある重要なリソースを保持しています
- スレッドは、同様に強制終了する必要がある他のいくつかのスレッドを作成しました。
余裕がある場合 (独自のスレッドを管理している場合)、これを処理する良い方法は、各スレッドが定期的にチェックして終了する時間かどうかを確認する exit_request フラグを設定することです。
例えば:
import threading
class StoppableThread(threading.Thread):
"""Thread class with a stop() method. The thread itself has to check
regularly for the stopped() condition."""
def __init__(self, *args, **kwargs):
super(StoppableThread, self).__init__(*args, **kwargs)
self._stop_event = threading.Event()
def stop(self):
self._stop_event.set()
def stopped(self):
return self._stop_event.is_set()
このコードではstop()
、スレッドを終了したいときにスレッドを呼び出し、 を使用してスレッドが適切に終了するのを待つ必要がありますjoin()
。スレッドは定期的に停止フラグをチェックする必要があります。
ただし、本当にスレッドを強制終了する必要がある場合もあります。たとえば、長時間の呼び出しでビジーな外部ライブラリをラップしていて、それを中断したい場合です。
次のコードでは、Python スレッドで例外を発生させることができます (いくつかの制限があります)。
def _async_raise(tid, exctype):
'''Raises an exception in the threads with id tid'''
if not inspect.isclass(exctype):
raise TypeError("Only types can be raised (not instances)")
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid),
ctypes.py_object(exctype))
if res == 0:
raise ValueError("invalid thread id")
elif res != 1:
# "if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"
ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), None)
raise SystemError("PyThreadState_SetAsyncExc failed")
class ThreadWithExc(threading.Thread):
'''A thread class that supports raising an exception in the thread from
another thread.
'''
def _get_my_tid(self):
"""determines this (self's) thread id
CAREFUL: this function is executed in the context of the caller
thread, to get the identity of the thread represented by this
instance.
"""
if not self.isAlive():
raise threading.ThreadError("the thread is not active")
# do we have it cached?
if hasattr(self, "_thread_id"):
return self._thread_id
# no, look for it in the _active dict
for tid, tobj in threading._active.items():
if tobj is self:
self._thread_id = tid
return tid
# TODO: in python 2.6, there's a simpler way to do: self.ident
raise AssertionError("could not determine the thread's id")
def raiseExc(self, exctype):
"""Raises the given exception type in the context of this thread.
If the thread is busy in a system call (time.sleep(),
socket.accept(), ...), the exception is simply ignored.
If you are sure that your exception should terminate the thread,
one way to ensure that it works is:
t = ThreadWithExc( ... )
...
t.raiseExc( SomeException )
while t.isAlive():
time.sleep( 0.1 )
t.raiseExc( SomeException )
If the exception is to be caught by the thread, you need a way to
check that your thread has caught it.
CAREFUL: this function is executed in the context of the
caller thread, to raise an exception in the context of the
thread represented by this instance.
"""
_async_raise( self._get_my_tid(), exctype )
(Tomer Filiba による Killable Threadsに基づいています。 の戻り値に関する引用は、古いバージョンの PythonPyThreadState_SetAsyncExc
からのもののようです。)
ドキュメントに記載されているように、これは特効薬ではありません。スレッドが Python インタープリターの外部でビジー状態の場合、割り込みをキャッチできないためです。
このコードの適切な使用パターンは、スレッドに特定の例外をキャッチさせ、クリーンアップを実行させることです。そうすれば、タスクを中断しても適切なクリーンアップを行うことができます。
multiprocessing.Process
缶_p.terminate()
スレッドを強制終了したいが、フラグ/ロック/シグナル/セマフォ/イベント/その他を使用したくない場合は、スレッドを本格的なプロセスに昇格させます。少数のスレッドのみを使用するコードの場合、オーバーヘッドはそれほど悪くありません。
たとえば、これはブロッキング I/O を実行するヘルパー「スレッド」を簡単に終了するのに便利です。
threading.Thread
変換は簡単です。関連するコードでは、すべてをmultiprocessing.Process
すべてqueue.Queue
に置き換えmultiprocessing.Queue
、必要な呼び出しを親プロセスに追加p.terminate()
します。これは、その子を強制終了したいプロセスですp
のPython ドキュメントをmultiprocessing
参照してください。
例:
import multiprocessing
proc = multiprocessing.Process(target=your_proc_function, args=())
proc.start()
# Terminate the process
proc.terminate() # sends a SIGTERM
それを行う公式の API はありません。
pthread_kill や TerminateThread など、プラットフォーム API を使用してスレッドを強制終了する必要があります。このような API には、たとえば pythonwin や ctypes を介してアクセスできます。
これは本質的に安全ではないことに注意してください。(スタック フレームのローカル変数がガベージになるため) 収集不能なガベージが発生する可能性が高く、強制終了された時点でスレッドが GIL を持っている場合、デッドロックが発生する可能性があります。
プログラム全体を終了しようとしている場合は、スレッドを「デーモン」として設定できます。Thread.daemonを参照
他の人が述べたように、標準は停止フラグを設定することです。軽量なもの (Thread のサブクラス化やグローバル変数なし) の場合、ラムダ コールバックはオプションです。( の括弧に注意してくださいif stop()
。)
import threading
import time
def do_work(id, stop):
print("I am thread", id)
while True:
print("I am thread {} doing something".format(id))
if stop():
print(" Exiting loop.")
break
print("Thread {}, signing off".format(id))
def main():
stop_threads = False
workers = []
for id in range(0,3):
tmp = threading.Thread(target=do_work, args=(id, lambda: stop_threads))
workers.append(tmp)
tmp.start()
time.sleep(3)
print('main: done sleeping; time to stop the threads.')
stop_threads = True
for worker in workers:
worker.join()
print('Finis.')
if __name__ == '__main__':
main()
常にフラッシュする関数 ( ) に置き換えるprint()
と、シェル出力の精度が向上する場合があります。pr()
sys.stdout.flush()
(Windows/Eclipse/Python3.3 でのみテスト済み)
Python では、スレッドを直接 kill することはできません。
スレッド (!) が本当に必要ない場合は、スレッド化パッケージを使用する代わりに、マルチプロセッシングパッケージを使用すること ができます。ここで、プロセスを強制終了するには、次のメソッドを呼び出すだけです。
yourProcess.terminate() # kill the process!
Python はプロセスを強制終了します (Unix では SIGTERM シグナルを介して、Windows ではTerminateProcess()
呼び出しを介して)。キューやパイプを使用している場合は注意してください。(キュー/パイプ内のデータが破損する可能性があります)
と は、それぞれ と とまったく同じように機能するmultiprocessing.Event
ことに注意してください。実際、前者は後者のクローンです。multiprocessing.Semaphore
threading.Event
threading.Semaphore
本当にスレッドを使用する必要がある場合、それを直接強制終了する方法はありません。ただし、できることは「デーモンスレッド」を使用することです。実際、Python では、スレッドにdaemonのフラグを立てることができます。
yourThread.daemon = True # set the Thread as a "daemon thread"
生きている非デーモン スレッドがなくなると、メイン プログラムは終了します。つまり、メイン スレッド (もちろん、非デーモン スレッド) が操作を終了すると、まだいくつかのデーモン スレッドが動作していても、プログラムは終了します。
メソッドが呼び出されるdaemon
前にスレッドを設定する必要があることに注意してください。start()
もちろん、 とdaemon
一緒でも使用できますし、使用する必要がありますmultiprocessing
。ここでは、メイン プロセスが終了するときに、すべてのデーモンの子プロセスを終了しようとします。
sys.exit()
最後に、とos.kill()
は選択ではないことに注意してください。
これは thread2 に基づいています -- killable スレッド (Python レシピ)
ctypes を介してのみ利用可能な PyThreadState_SetasyncExc() を呼び出す必要があります。
これは Python 2.7.3 でのみテストされていますが、他の最近の 2.x リリースでも動作する可能性があります。
import ctypes
def terminate_thread(thread):
"""Terminates a python thread from another thread.
:param thread: a threading.Thread instance
"""
if not thread.isAlive():
return
exc = ctypes.py_object(SystemExit)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(thread.ident), exc)
if res == 0:
raise ValueError("nonexistent thread id")
elif res > 1:
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, None)
raise SystemError("PyThreadState_SetAsyncExc failed")
スレッドに協力せずにスレッドを強制終了してはいけません。
スレッドを強制終了すると、try/finally ブロックの設定が保証されなくなるため、ロックがロックされたままになったり、ファイルが開いたりする可能性があります。
強制的にスレッドを強制終了することが良い考えであると主張できる唯一の場合は、プログラムを高速に終了することですが、単一のスレッドを強制終了することはできません。
スレッドを終了するスレッドにトレースをインストールすることで、スレッドを強制終了できます。可能な実装の 1 つについては、添付のリンクを参照してください。
スレッドを殺さない方が良いです。スレッドのサイクルに「try」ブロックを導入し、スレッドを停止したいときに例外をスローする方法があります (たとえば、for/while/... を停止する break/return/...)。私は自分のアプリでこれを使用しましたが、うまくいきます...
Thread.stop
次のコード例に示すように、メソッドを実装することは間違いなく可能です。
import sys
import threading
import time
class StopThread(StopIteration):
pass
threading.SystemExit = SystemExit, StopThread
class Thread2(threading.Thread):
def stop(self):
self.__stop = True
def _bootstrap(self):
if threading._trace_hook is not None:
raise ValueError('Cannot run thread with tracing!')
self.__stop = False
sys.settrace(self.__trace)
super()._bootstrap()
def __trace(self, frame, event, arg):
if self.__stop:
raise StopThread()
return self.__trace
class Thread3(threading.Thread):
def _bootstrap(self, stop_thread=False):
def stop():
nonlocal stop_thread
stop_thread = True
self.stop = stop
def tracer(*_):
if stop_thread:
raise StopThread()
return tracer
sys.settrace(tracer)
super()._bootstrap()
###############################################################################
def main():
test1 = Thread2(target=printer)
test1.start()
time.sleep(1)
test1.stop()
test1.join()
test2 = Thread2(target=speed_test)
test2.start()
time.sleep(1)
test2.stop()
test2.join()
test3 = Thread3(target=speed_test)
test3.start()
time.sleep(1)
test3.stop()
test3.join()
def printer():
while True:
print(time.time() % 1)
time.sleep(0.1)
def speed_test(count=0):
try:
while True:
count += 1
except StopThread:
print('Count =', count)
if __name__ == '__main__':
main()
このThread3
クラスは、クラスよりも約 33% 高速にコードを実行するようですThread2
。
from ctypes import *
pthread = cdll.LoadLibrary("libpthread-2.15.so")
pthread.pthread_cancel(c_ulong(t.ident))
tはあなたのThread
オブジェクトです。
Python のソース (Modules/threadmodule.c
および) を読むと、 が型であるPython/thread_pthread.h
ことがわかります。そのため、Python の使用で実行できることは何でも実行できます。Thread.ident
pthread_t
pthread
libpthread
カスタマイズされた関数で KillableThread サブクラスを作成する @SCB のアイデア (まさに私が必要としていたもの) を構築するためだけに:
from threading import Thread, Event
class KillableThread(Thread):
def __init__(self, sleep_interval=1, target=None, name=None, args=(), kwargs={}):
super().__init__(None, target, name, args, kwargs)
self._kill = Event()
self._interval = sleep_interval
print(self._target)
def run(self):
while True:
# Call custom function with arguments
self._target(*self._args)
# If no kill signal is set, sleep for the interval,
# If kill signal comes in while sleeping, immediately
# wake up and handle
is_killed = self._kill.wait(self._interval)
if is_killed:
break
print("Killing Thread")
def kill(self):
self._kill.set()
if __name__ == '__main__':
def print_msg(msg):
print(msg)
t = KillableThread(10, print_msg, args=("hello world"))
t.start()
time.sleep(6)
print("About to kill thread")
t.kill()
当然のことながら、@SBC と同様に、スレッドは新しいループの実行を待って停止することはありません。この例では、スレッドが完了するまでさらに 4 秒待つのではなく、「スレッドを強制終了しようとしています」の直後に「スレッドを強制終了しています」というメッセージが表示されます (既に 6 秒間スリープしているため)。
KillableThread コンストラクターの 2 番目の引数は、カスタム関数 (ここでは print_msg) です。Args 引数は、関数 (("hello world")) を呼び出すときに使用される引数です。
かなり古いものですが、これは一部の人にとっては便利な解決策かもしれません:
スレッドのモジュール機能を拡張する小さなモジュール -- あるスレッドが別のスレッドのコンテキストで例外を発生させることを可能にします。を上げること
SystemExit
で、最終的に Python スレッドを強制終了できます。
import threading
import ctypes
def _async_raise(tid, excobj):
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(excobj))
if res == 0:
raise ValueError("nonexistent thread id")
elif res > 1:
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, 0)
raise SystemError("PyThreadState_SetAsyncExc failed")
class Thread(threading.Thread):
def raise_exc(self, excobj):
assert self.isAlive(), "thread must be started"
for tid, tobj in threading._active.items():
if tobj is self:
_async_raise(tid, excobj)
return
# the thread was alive when we entered the loop, but was not found
# in the dict, hence it must have been already terminated. should we raise
# an exception here? silently ignore?
def terminate(self):
# must raise the SystemExit type, instead of a SystemExit() instance
# due to a bug in PyThreadState_SetAsyncExc
self.raise_exc(SystemExit)
そのため、「スレッドが別のスレッドのコンテキストで例外を発生させる」ことができ、このようにして、終了したスレッドは中止フラグを定期的にチェックすることなく終了を処理できます。
ただし、元のソースによると、このコードにはいくつかの問題があります。
- この例外は、Python バイトコードを実行する場合にのみ発生します。スレッドがネイティブ/組み込みのブロッキング関数を呼び出す場合、実行が Python コードに戻ったときにのみ例外が発生します。
- 組み込み関数が PyErr_Clear() を内部的に呼び出す場合にも問題があり、保留中の例外が事実上キャンセルされます。もう一度上げてみることができます。
- 安全に発生できるのは例外タイプのみです。例外インスタンスは予期しない動作を引き起こす可能性が高いため、制限されています。
- 例: t1.raise_exc(TypeError) であり、t1.raise_exc(TypeError("blah")) ではありません。
- 私見はバグです。私はそれをバグとして報告しました。詳細については、http://mail.python.org/pipermail/python-dev/2006-August/068158.html を ご覧ください。
- この関数を組み込みのスレッド モジュールで公開するように依頼しましたが、ctypes は標準ライブラリ (2.5 以降) になり、この
機能は実装に依存しない可能性が高いため、公開しないままにすることができます
。
setDaemon(True) でサブスレッドを開始します。
def bootstrap(_filename):
mb = ModelBootstrap(filename=_filename) # Has many Daemon threads. All get stopped automatically when main thread is stopped.
t = threading.Thread(target=bootstrap,args=('models.conf',))
t.setDaemon(False)
while True:
t.start()
time.sleep(10) # I am just allowing the sub-thread to run for 10 sec. You can listen on an event to stop execution.
print('Thread stopped')
break