86

ループしているスレッドにループを停止するように指示する適切な方法は何ですか?

threading.Thread別のクラスで指定されたホストに ping を実行するかなり単純なプログラムがあります。このクラスでは 60 秒間スリープし、アプリケーションが終了するまで再び実行されます。

wx.Frameループしているスレッドに停止を求めるために、「停止」ボタンを実装したいと思います。スレッドをすぐに終了する必要はありません。スレッドが起動したら、ループを停止できます。

これが私のthreadingクラスです (注: ループはまだ実装していませんが、PingAssets の run メソッドに該当する可能性があります)。

class PingAssets(threading.Thread):
    def __init__(self, threadNum, asset, window):
        threading.Thread.__init__(self)
        self.threadNum = threadNum
        self.window = window
        self.asset = asset

    def run(self):
        config = controller.getConfig()
        fmt = config['timefmt']
        start_time = datetime.now().strftime(fmt)
        try:
            if onlinecheck.check_status(self.asset):
                status = "online"
            else:
                status = "offline"
        except socket.gaierror:
            status = "an invalid asset tag."
        msg =("{}: {} is {}.   \n".format(start_time, self.asset, status))
        wx.CallAfter(self.window.Logger, msg)

そして、私の wxPyhton フレームでは、この関数を [スタート] ボタンから呼び出しています。

def CheckAsset(self, asset):
        self.count += 1
        thread = PingAssets(self.count, asset, self)
        self.threads.append(thread)
        thread.start()
4

7 に答える 7

144

ねじ止め機能

をサブクラス化する代わりにthreading.Thread、フラグで停止できるように関数を変更できます。

実行中の関数にアクセスできるオブジェクトが必要で、実行を停止するフラグを設定します。

オブジェクトを使用できthreading.currentThread()ます。

import threading
import time


def doit(arg):
    t = threading.currentThread()
    while getattr(t, "do_run", True):
        print ("working on %s" % arg)
        time.sleep(1)
    print("Stopping as you wish.")


def main():
    t = threading.Thread(target=doit, args=("task",))
    t.start()
    time.sleep(5)
    t.do_run = False
    

if __name__ == "__main__":
    main()

秘訣は、実行中のスレッドに追加のプロパティを添付できることです。このソリューションは、次の前提に基づいています。

  • スレッドにはデフォルト値のプロパティ「do_run」がありますTrue
  • 駆動する親プロセスは、開始されたスレッドにプロパティ「do_run」を割り当てることができますFalse

コードを実行すると、次の出力が得られます。

$ python stopthread.py                                                        
working on task
working on task
working on task
working on task
working on task
Stopping as you wish.

Pill to kill - イベントの使用

他の代替手段はthreading.Event、関数の引数として使用することです。デフォルトFalseでは ですが、外部プロセスはそれを「設定」( にTrue) でき、関数は関数を使用してそれについて学習できwait(timeout)ます。

タイムアウトをwaitゼロにすることもできますが、スリープ タイマーとして使用することもできます (以下で使用)。

def doit(stop_event, arg):
    while not stop_event.wait(1):
        print ("working on %s" % arg)
    print("Stopping as you wish.")


def main():
    pill2kill = threading.Event()
    t = threading.Thread(target=doit, args=(pill2kill, "task"))
    t.start()
    time.sleep(5)
    pill2kill.set()
    t.join()

編集:Python 3.6でこれを試しました。stop_event.wait()解放されるまでイベント (および while ループ) をブロックします。ブール値を返しません。stop_event.is_set()代わりに作品を使用します。

1 つの錠剤で複数のスレッドを停止する

一度に複数のスレッドを停止する必要がある場合は、1 つのピルですべてが機能するため、Pill to kill の利点がよくわかります。

doitまったく変更されませんmain。スレッドの処理が少し異なるだけです。

def main():
    pill2kill = threading.Event()
    tasks = ["task ONE", "task TWO", "task THREE"]

    def thread_gen(pill2kill, tasks):
        for task in tasks:
            t = threading.Thread(target=doit, args=(pill2kill, task))
            yield t

    threads = list(thread_gen(pill2kill, tasks))
    for thread in threads:
        thread.start()
    time.sleep(5)
    pill2kill.set()
    for thread in threads:
        thread.join()
于 2016-04-08T12:25:44.430 に答える
14

Stack に関する他の質問を読みましたが、クラス間のコミュニケーションについてはまだ少し混乱していました。これが私がそれにアプローチした方法です:

リストを使用して、すべてのスレッドを__init__wxFrame クラスのメソッドに保持します。self.threads = []

Pythonでループスレッドを停止する方法で推奨されているように? Trueスレッドクラスを初期化するときに設定されるスレッドクラスでシグナルを使用します。

class PingAssets(threading.Thread):
    def __init__(self, threadNum, asset, window):
        threading.Thread.__init__(self)
        self.threadNum = threadNum
        self.window = window
        self.asset = asset
        self.signal = True

    def run(self):
        while self.signal:
             do_stuff()
             sleep()

スレッドを反復処理することで、これらのスレッドを停止できます。

def OnStop(self, e):
        for t in self.threads:
            t.signal = False
于 2013-08-02T13:45:32.033 に答える
3

私は別のアプローチをとっていました。Thread クラスをサブクラス化し、コンストラクターで Event オブジェクトを作成しました。次に、カスタムの join() メソッドを作成しました。このメソッドは、最初にこのイベントを設定してから、それ自体の親のバージョンを呼び出します。

これが私のクラスです。wxPythonアプリでシリアルポート通信に使用しています:

import wx, threading, serial, Events, Queue

class PumpThread(threading.Thread):

    def __init__ (self, port, queue, parent):
        super(PumpThread, self).__init__()
        self.port = port
        self.queue = queue
        self.parent = parent

        self.serial = serial.Serial()
        self.serial.port = self.port
        self.serial.timeout = 0.5
        self.serial.baudrate = 9600
        self.serial.parity = 'N'

        self.stopRequest = threading.Event()

    def run (self):
        try:
            self.serial.open()
        except Exception, ex:
            print ("[ERROR]\tUnable to open port {}".format(self.port))
            print ("[ERROR]\t{}\n\n{}".format(ex.message, ex.traceback))
            self.stopRequest.set()
        else:
            print ("[INFO]\tListening port {}".format(self.port))
            self.serial.write("FLOW?\r")

        while not self.stopRequest.isSet():
            msg = ''
            if not self.queue.empty():
                try:
                    command = self.queue.get()
                    self.serial.write(command)
                except Queue.Empty:
                    continue

            while self.serial.inWaiting():
                char = self.serial.read(1)
                if '\r' in char and len(msg) > 1:
                    char = ''
                    #~ print('[DATA]\t{}'.format(msg))
                    event = Events.PumpDataEvent(Events.SERIALRX, wx.ID_ANY, msg)
                    wx.PostEvent(self.parent, event)
                    msg = ''
                    break
                msg += char
        self.serial.close()

    def join (self, timeout=None):
        self.stopRequest.set()
        super(PumpThread, self).join(timeout)

    def SetPort (self, serial):
        self.serial = serial

    def Write (self, msg):
        if self.serial.is_open:
            self.queue.put(msg)
        else:
            print("[ERROR]\tPort {} is not open!".format(self.port))

    def Stop(self):
        if self.isAlive():
            self.join()

Queue はメッセージをポートに送信するために使用され、メイン ループは応答を返します。最終行の文字が異なるため、serial.readline() メソッドを使用していません。また、io クラスの使用法が面倒すぎることがわかりました。

于 2018-01-17T10:37:41.390 に答える
0

threading.Threadスレッド機能をカプセル化するために、から派生したクラスがあると便利です。run()このクラスのオーバーライドされたバージョンで独自のメイン ループを提供するだけです。呼び出しは、オブジェクトのメソッドが別のスレッドで呼び出されるようにstart()手配します。run()

メイン ループ内で、athreading.Eventが設定されているかどうかを定期的に確認します。このようなイベントはスレッドセーフです。

このクラス内には、基本クラスjoin()のメソッドを呼び出す前に停止イベント オブジェクトを設定する独自のメソッドがあります。join()必要に応じて、基本クラスのメソッドに渡される時間値をjoin()使用して、スレッドが短時間で終了するようにすることができます。

import threading
import time

class MyThread(threading.Thread):
    def __init__(self, sleep_time=0.1):
        self._stop_event = threading.Event()
        self._sleep_time = sleep_time
        """call base class constructor"""
        super().__init__()

    def run(self):
        """main control loop"""
        while not self._stop_event.isSet():
            #do work
            print("hi")
            self._stop_event.wait(self._sleep_time)

    def join(self, timeout=None):
        """set stop event and join within a given time period"""
        self._stop_event.set()
        super().join(timeout)


if __name__ == "__main__":
    t = MyThread()
    t.start()

    time.sleep(5)

    t.join(1) #wait 1s max

をチェックする前にメイン ループ内で少しスリープ状態にすることは、threading.Event継続的にループするよりも CPU への負荷が少なくなります。デフォルトのスリープ時間 (0.1 秒など) を設定できますが、コンストラクターで値を渡すこともできます。

于 2021-01-25T08:18:47.920 に答える