8

unittest フレームワークを使用して、マルチスレッドの python コード、外部ハードウェア、および組み込み C の統合テストを自動化しています。統合テストのための unittesting フレームワークの露骨な乱用にもかかわらず、それは非常にうまく機能します。1 つの問題を除いて: 生成されたスレッドのいずれかから例外が発生した場合、テストを失敗させる必要があります。これは unittest フレームワークで可能ですか?

単純だが実行できない解決策は、a) コードをリファクタリングしてマルチスレッドを回避するか、b) 各スレッドを個別にテストすることです。コードが外部ハードウェアと非同期に対話するため、それはできません。また、例外をメインの unittest スレッドに転送するために、ある種のメッセージ パッシングを実装することも検討しました。これには、テスト対象のコードに大幅なテスト関連の変更が必要になるため、それは避けたいと考えています。

例の時間。x.ExceptionRaiser クラスを変更せずに、my_thread で発生した例外で失敗するように以下のテスト スクリプトを変更できますか?

import unittest
import x

class Test(unittest.TestCase):
    def test_x(self):
        my_thread = x.ExceptionRaiser()
        # Test case should fail when thread is started and raises
        # an exception.
        my_thread.start()
        my_thread.join()

if __name__ == '__main__':
    unittest.main()
4

3 に答える 3

6

最初はsys.excepthook、解決策のように見えました。これは、キャッチされない例外がスローされるたびに呼び出されるグローバル フックです。

残念ながら、これは機能しません。なんで?画面に表示される素敵なトレースバックを出力するコードで関数を適切にthreadingラップrunします (常にどのように表示されるかに注意してくださいException in thread {Name of your thread here}。これがその方法です)。

Python 3.8 から、これを機能させるためにオーバーライドできる関数があります: threading.excepthook

... threading.excepthook() をオーバーライドして、Thread.run() によって発生したキャッチされていない例外の処理方法を制御できます

どうしようか?この関数をロジックに置き換えてください。

Python >= 3.8 の場合

import traceback
import threading 
import os


class GlobalExceptionWatcher(object):
    def _store_excepthook(self, args):
        '''
        Uses as an exception handlers which stores any uncaught exceptions.
        '''
        self.__org_hook(args)
        formated_exc = traceback.format_exception(args.exc_type, args.exc_value, args.exc_traceback)
        self._exceptions.append('\n'.join(formated_exc))
        return formated_exc

    def __enter__(self):
        '''
        Register us to the hook.
        '''
        self._exceptions = []
        self.__org_hook = threading.excepthook
        threading.excepthook = self._store_excepthook

    def __exit__(self, type, value, traceback):
        '''
        Remove us from the hook, assure no exception were thrown.
        '''
        threading.excepthook = self.__org_hook
        if len(self._exceptions) != 0:
            tracebacks = os.linesep.join(self._exceptions)
            raise Exception(f'Exceptions in other threads: {tracebacks}')

古いバージョンの Python の場合、これはもう少し複雑です。簡単に言えば、threading結節には、次のような行に沿って何かを行う文書化されていないインポートがあるようです。

threading._format_exc = traceback.format_exc

驚くことではありませんが、この関数は、スレッドのrun関数から例外がスローされたときにのみ呼び出されます。

したがって、python <= 3.7の場合

import threading 
import os

class GlobalExceptionWatcher(object):
    def _store_excepthook(self):
        '''
        Uses as an exception handlers which stores any uncaught exceptions.
        '''
        formated_exc = self.__org_hook()
        self._exceptions.append(formated_exc)
        return formated_exc
        
    def __enter__(self):
        '''
        Register us to the hook.
        '''
        self._exceptions = []
        self.__org_hook = threading._format_exc
        threading._format_exc = self._store_excepthook
        
    def __exit__(self, type, value, traceback):
        '''
        Remove us from the hook, assure no exception were thrown.
        '''
        threading._format_exc = self.__org_hook
        if len(self._exceptions) != 0:
            tracebacks = os.linesep.join(self._exceptions)
            raise Exception('Exceptions in other threads: %s' % tracebacks)

使用法:

my_thread = x.ExceptionRaiser()
# will fail when thread is started and raises an exception.
with GlobalExceptionWatcher():
    my_thread.start()
    my_thread.join()
            

それでもjoin自分で行う必要がありますが、終了時に with ステートメントのコンテキスト マネージャーが他のスレッドでスローされた例外をチェックし、適切に例外を発生させます。


コードは「現状のまま」提供され、明示または黙示を問わず、いかなる種類の保証もありません

これは文書化されていない、一種の恐ろしいハックです。Linux と Windows でテストしましたが、動作するようです。ご自身の責任で使用してください。

于 2012-09-19T13:37:43.237 に答える
1

私はこの問題に遭遇しましたが、私が思いついた唯一の解決策は、スレッドをサブクラス化して、例外をキャッチせずに終了するかどうかの属性を含めることです。

from threading import Thread

class ErrThread(Thread):
    """                                                                                                                                                                                               
    A subclass of Thread that will log store exceptions if the thread does                                                                                                                            
    not exit normally                                                                                                                                                                                 
    """
    def run(self):
        try:
            Thread.run(self)
        except Exception as self.err:
            pass
        else:
            self.err = None


class TaskQueue(object):
    """                                                                                                                                                                                               
    A utility class to run ErrThread objects in parallel and raises and exception                                                                                                                     
    in the event that *any* of them fail.                                                                                                                                                             
    """

    def __init__(self, *tasks):

        self.threads = []

        for t in tasks:
            try:
                self.threads.append(ErrThread(**t)) ## passing in a dict of target and args
            except TypeError:
                self.threads.append(ErrThread(target=t))

    def run(self):

        for t in self.threads:
            t.start()
        for t in self.threads:
            t.join()
            if t.err:
                raise Exception('Thread %s failed with error: %s' % (t.name, t.err))
于 2012-09-29T09:11:58.373 に答える