34

環境

私は最近、Code Review のレビュー用にタイマー クラスを投稿しました。1 つの単体テストが失敗するのを見たことがあるので、並行性のバグがあると直観していましたが、失敗を再現できませんでした。したがって、コードレビューへの私の投稿。

コード内のさまざまな競合状態を強調する素晴らしいフィードバックをいくつか受け取りました。(と思った)問題と解決策は理解しましたが、修正を行う前に、単体テストでバグを公開したかったのです。やってみると、難しいと思いました。さまざまなスタック交換の回答は、スレッドの実行を制御してバグを公開する必要があることを示唆しており、不自然なタイミングは必ずしも別のマシンに移植できるとは限りません。これは、私が解決しようとしていた問題を超えた多くの偶発的な複雑さのように思えました。

代わりに、python 用の最高の静的解析 (SA) ツールである PyLint を使用して、バグを見つけられるかどうかを確認しようとしましたが、できませんでした。人間はコード レビュー (本質的には SA) を通じてバグを見つけることができたのに、SA ツールでは見つけられなかったのはなぜですか?

Valgrind を Python で動作させることを恐れて(ヤクの毛刈りのように聞こえました)、最初にバグを再現せずにバグを修正することにしました。今、私はピクルスにいます。

これが今のコードです。

from threading import Timer, Lock
from time import time

class NotRunningError(Exception): pass
class AlreadyRunningError(Exception): pass


class KitchenTimer(object):
    '''
    Loosely models a clockwork kitchen timer with the following differences:
        You can start the timer with arbitrary duration (e.g. 1.2 seconds).
        The timer calls back a given function when time's up.
        Querying the time remaining has 0.1 second accuracy.
    '''

    PRECISION_NUM_DECIMAL_PLACES = 1
    RUNNING = "RUNNING"
    STOPPED = "STOPPED"
    TIMEUP  = "TIMEUP"

    def __init__(self):
        self._stateLock = Lock()
        with self._stateLock:
            self._state = self.STOPPED
            self._timeRemaining = 0

    def start(self, duration=1, whenTimeup=None):
        '''
        Starts the timer to count down from the given duration and call whenTimeup when time's up.
        '''
        with self._stateLock:
            if self.isRunning():
                raise AlreadyRunningError
            else:
                self._state = self.RUNNING
                self.duration = duration
                self._userWhenTimeup = whenTimeup
                self._startTime = time()
                self._timer = Timer(duration, self._whenTimeup)
                self._timer.start()

    def stop(self):
        '''
        Stops the timer, preventing whenTimeup callback.
        '''
        with self._stateLock:
            if self.isRunning():
                self._timer.cancel()
                self._state = self.STOPPED
                self._timeRemaining = self.duration - self._elapsedTime()
            else:
                raise NotRunningError()

    def isRunning(self):
        return self._state == self.RUNNING

    def isStopped(self):
        return self._state == self.STOPPED

    def isTimeup(self):
        return self._state == self.TIMEUP

    @property
    def timeRemaining(self):
        if self.isRunning():
            self._timeRemaining = self.duration - self._elapsedTime()
        return round(self._timeRemaining, self.PRECISION_NUM_DECIMAL_PLACES)

    def _whenTimeup(self):
        with self._stateLock:
            self._state = self.TIMEUP
            self._timeRemaining = 0
            if callable(self._userWhenTimeup):
                self._userWhenTimeup()

    def _elapsedTime(self):
        return time() - self._startTime

質問

このコード例のコンテキストで、競合状態を明らかにし、修正し、修正されたことを証明するにはどうすればよいでしょうか?

エクストラポイント

特にこのコードではなく、他の実装や問題に適したテスト フレームワークの追加ポイント。

取り除く

私の結論は、特定された競合状態を再現するための技術的な解決策は、2 つのスレッドの同期を制御して、バグが発生する順序で確実に実行されるようにすることです。ここで重要な点は、それらがすでに特定された競合状態であるということです。競合状態を特定する最善の方法は、コードをコード レビューにかけ、より多くの専門家にコードを分析してもらうことです。

4

4 に答える 4

11

従来、マルチスレッド コードでの競合状態の強制はセマフォで行われるため、続行する前に、別のスレッドが何らかのエッジ状態に達するまでスレッドを強制的に待機させることができます。

たとえばstart、オブジェクトが既に実行されている場合、オブジェクトには呼び出されないことを確認するコードがあります。次のようにして、この条件が期待どおりに動作するように強制できます。

  • を開始するKitchenTimer
  • 実行中の状態でセマフォにタイマー ブロックを配置する
  • 別のスレッドで同じタイマーを開始する
  • キャッチAlreadyRunningError

これを行うには、KitchenTimer クラスを拡張する必要がある場合があります。正式な単体テストでは、重要なタイミングでブロックするように定義されたモック オブジェクトを使用することがよくあります。モック オブジェクトは、ここで扱うよりも大きなトピックですが、「python モック オブジェクト」をグーグルで検索すると、多くのドキュメントと多くの実装から選択できます。

コードを強制的にスローさせる方法は次のAlreadyRunningErrorとおりです。

import threading

class TestKitchenTimer(KitchenTimer):

    _runningLock = threading.Condition()

    def start(self, duration=1, whenTimeUp=None):
        KitchenTimer.start(self, duration, whenTimeUp)
        with self._runningLock:
            print "waiting on _runningLock"
            self._runningLock.wait()

    def resume(self):
        with self._runningLock:
            self._runningLock.notify()

timer = TestKitchenTimer()

# Start the timer in a subthread. This thread will block as soon as
# it is started.
thread_1 = threading.Thread(target = timer.start, args = (10, None))
thread_1.start()

# Attempt to start the timer in a second thread, causing it to throw
# an AlreadyRunningError.
try:
    thread_2 = threading.Thread(target = timer.start, args = (10, None))
    thread_2.start()
except AlreadyRunningError:
    print "AlreadyRunningError"
    timer.resume()
    timer.stop()

コードを読んで、テストしたい境界条件をいくつか特定し、その条件を発生させるためにタイマーを一時停止する必要がある場所を考え、それを発生させるために条件、セマフォ、イベントなどを追加します。たとえば、タイマーが whenTimeUp コールバックを実行しているときに、別のスレッドがそれを停止しようとするとどうなりますか? _whenTimeUp に入るとすぐにタイマーを待機させることで、その状態を強制できます。

import threading

class TestKitchenTimer(KitchenTimer):

    _runningLock = threading.Condition()

    def _whenTimeup(self):
        with self._runningLock:
            self._runningLock.wait()
        KitchenTimer._whenTimeup(self)

    def resume(self):
        with self._runningLock:
            self._runningLock.notify()

def TimeupCallback():
    print "TimeupCallback was called"

timer = TestKitchenTimer()

# The timer thread will block when the timer expires, but before the callback
# is invoked.
thread_1 = threading.Thread(target = timer.start, args = (1, TimeupCallback))
thread_1.start()
sleep(2)

# The timer is now blocked. In the parent thread, we stop it.
timer.stop()
print "timer is stopped: %r" % timer.isStopped()

# Now allow the countdown thread to resume.
timer.resume()

テストしたいクラスをサブクラス化することは、テスト用にインストルメント化する素晴らしい方法ではありません: 基本的にすべてのメソッドをオーバーライドして、それぞれの競合状態をテストする必要があります。元のコードを実際にテストしていないことを確認してください。代わりに、セマフォを KitchenTimer オブジェクトに直接配置し、デフォルトで None に初期化しif testRunningLock is not None:、ロックを取得または待機する前にメソッドをチェックする方がクリーンな場合があります。次に、送信している実際のコードで競合を強制できます。

役立つかもしれないPythonモックフレームワークに関するいくつかの読書。実際、このコードのテストにモックが役立つかどうかはわかりません。ほとんど完全に自己完結型であり、多くの外部オブジェクトに依存していません。しかし、模擬チュートリアルでは、このような問題に触れることがあります。私はこれらのどれも使用していませんが、これらのドキュメントは始めるのに適した場所のようです:

于 2013-11-27T06:07:03.247 に答える
8

スレッド (非) セーフ コードをテストするための最も一般的な解決策は、多くのスレッドを開始し、最善を期待することです。私と他の人が抱えている問題は、これが偶然に依存し、テストが「重くなる」ことです。

少し前にこれに出くわしたので、力ずくではなく正確さを求めました。その結果、スレッドを首から首まで競争させることによって競合状態を引き起こすテストコードが得られます。

際どいコードの例

spam = []

def set_spam():
    spam[:] = foo()
    use(spam)

set_spamが複数のスレッドから呼び出された場合、 の変更と使用の間に競合状態が存在しますspam。一貫して再現してみましょう。

競合状態を引き起こす方法

class TriggeredThread(threading.Thread):
    def __init__(self, sequence=None, *args, **kwargs):
        self.sequence = sequence
        self.lock = threading.Condition()
        self.event = threading.Event()
        threading.Thread.__init__(self, *args, **kwargs)

    def __enter__(self):
        self.lock.acquire()
        while not self.event.is_set():
            self.lock.wait()
        self.event.clear()

    def __exit__(self, *args):
        self.lock.release()
        if self.sequence:
            next(self.sequence).trigger()

    def trigger(self):
        with self.lock:
            self.event.set()
            self.lock.notify()

次に、このスレッドの使用方法を示します。

spam = []  # Use a list to share values across threads.
results = []  # Register the results.

def set_spam():
    thread = threading.current_thread()
    with thread:  # Acquires the lock.
        # Set 'spam' to thread name
        spam[:] = [thread.name]
    # Thread 'releases' the lock upon exiting the context.
    # The next thread is triggered and this thread waits for a trigger.
    with thread:
        # Since each thread overwrites the content of the 'spam'
        # list, this should only result in True for the last thread.
        results.append(spam == [thread.name])

threads = [
    TriggeredThread(name='a', target=set_spam),
    TriggeredThread(name='b', target=set_spam),
    TriggeredThread(name='c', target=set_spam)]

# Create a shifted sequence of threads and share it among the threads.
thread_sequence = itertools.cycle(threads[1:] + threads[:1])
for thread in threads:
    thread.sequence = thread_sequence

# Start each thread
[thread.start() for thread in threads]
# Trigger first thread.
# That thread will trigger the next thread, and so on.
threads[0].trigger()
# Wait for each thread to finish.
[thread.join() for thread in threads]
# The last thread 'has won the race' overwriting the value
# for 'spam', thus [False, False, True].
# If set_spam were thread-safe, all results would be true.
assert results == [False, False, True], "race condition triggered"
assert results == [True, True, True], "code is thread-safe"

自分の状況に合わせて実装できるように、この構造について十分に説明したと思います。これは「追加ポイント」セクションに非常にうまく適合すると思います。

特にこのコードではなく、他の実装や問題に適したテスト フレームワークの追加ポイント。

競合状態の解決

共有変数

各スレッドの問題は、独自の方法で解決されます。上記の例では、スレッド間で値を共有することで競合状態を引き起こしました。モジュール属性などのグローバル変数を使用すると、同様の問題が発生する可能性があります。このような問題を解決する鍵は、スレッド ローカル ストレージを使用することです。

# The thread local storage is a global.
# This may seem weird at first, but it isn't actually shared among threads.
data = threading.local()
data.spam = []  # This list only exists in this thread.
results = []  # Results *are* shared though.

def set_spam():
    thread = threading.current_thread()
    # 'get' or set the 'spam' list. This actually creates a new list.
    # If the list was shared among threads this would cause a race-condition.
    data.spam = getattr(data, 'spam', [])
    with thread:
        data.spam[:] = [thread.name]
    with thread:
        results.append(data.spam == [thread.name])

# Start the threads as in the example above.

assert all(results)  # All results should be True.

同時読み取り/書き込み

一般的なスレッド化の問題は、複数のスレッドがデータ ホルダーに対して同時に読み取りおよび/または書き込みを行うという問題です。この問題は、読み書きロックを実装することで解決されます。読み書きロックの実際の実装は異なる場合があります。読み取り優先ロック、書き込み優先ロック、またはランダムに選択できます。

そのようなロック手法を説明する例が世の中にあると確信しています。これはすでにかなり長い答えなので、後で例を書くかもしれません。;-)

ノート

threading モジュールのドキュメントを見て、少し試してみてください。スレッドの問題はそれぞれ異なるため、異なる解決策が適用されます。

スレッド化については、Python GIL (Global Interpreter Lock) を参照してください。パフォーマンスを最適化する上で、スレッド化は実際には最良のアプローチではない可能性があることに注意することが重要です (ただし、これは目標ではありません)。このプレゼンテーションはかなり良いと思います: https://www.youtube.com/watch?v=zEaosS1U5qY

于 2013-11-27T09:50:45.887 に答える
4

多くのスレッドを使用してテストできます。

import sys, random, thread
def timeup():
    sys.stdout.write("Timer:: Up %f" % time())

def trdfunc(kt, tid):
    while True :
        sleep(1)
        if not kt.isRunning():
            if kt.start(1, timeup):
                sys.stdout.write("[%d]: started\n" % tid)
        else:
            if random.random() < 0.1:
                kt.stop()
                sys.stdout.write("[%d]: stopped\n" % tid)
        sys.stdout.write("[%d] remains %f\n" % ( tid, kt.timeRemaining))

kt = KitchenTimer()
kt.start(1, timeup)
for i in range(1, 100):
    thread.start_new_thread ( trdfunc, (kt, i) )
trdfunc(kt, 0)

私が見るいくつかの問題の問題:

  • スレッドがタイマーが実行されていないことを認識して開始しようとすると、コードは通常、テストと開始の間のコンテキストの切り替えにより例外を発生させます。例外を発生させるのは多すぎると思います。または、アトミックな testAndStart 関数を持つことができます

  • stop でも同様の問題が発生します。testAndStop 関数を実装できます。

  • timeRemaining関数からのこのコードでも:

    if self.isRunning():
       self._timeRemaining = self.duration - self._elapsedTime()
    

    ある種の原子性が必要です。おそらく isRunning をテストする前にロックを取得する必要があります。

このクラスをスレッド間で共有する場合は、これらの問題に対処する必要があります。

于 2013-11-21T12:18:33.967 に答える
3

一般に、これは実行可能な解決策ではありません。デバッガーを使用してこの競合状態を再現できます (コード内のいくつかの場所にブレークポイントを設定し、ブレークポイントの 1 つに到達したときよりも - スレッドをフリーズして別のブレークポイントに到達するまでコードを実行し、次にこのスレッドをフリーズして最初のスレッドをフリーズ解除します)。スレッドの場合、この手法を使用して任意の方法でスレッドの実行をインターリーブできます)。

問題は、スレッドとコードが多いほど、副作用をインターリーブする方法が増えることです。実際、指数関数的に成長します。一般的にテストする実行可能なソリューションはありません。いくつかの単純なケースでのみ可能です。

この問題の解決策はよく知られています。副作用を認識するコードを記述し、ロック、セマフォ、キューなどの同期プリミティブを使用して副作用を制御するか、可能であれば不変データを使用します。

おそらくより実用的な方法は、実行時チェックを使用して正しい呼び出し順序を強制することです。例(疑似コード):

class RacyObject:
    def __init__(self):
        self.__cnt = 0
        ...

    def isReadyAndLocked(self):
        acquire_object_lock
            if self.__cnt % 2 != 0:
                # another thread is ready to start the Job
                return False
            if self.__is_ready:
                self.__cnt += 1
                return True
            # Job is in progress or doesn't ready yet
            return False
        release_object_lock

    def doJobAndRelease(self):
        acquire_object_lock
            if self.__cnt % 2 != 1:
                raise RaceConditionDetected("Incorrect order")
            self.__cnt += 1
            do_job()
        release_object_lock

isReadyAndLockを呼び出す前にチェックしないと、このコードは例外をスローしますdoJobAndRelease。これは、1 つのスレッドのみを使用して簡単にテストできます。

obj = RacyObject()
...
# correct usage
if obj.isReadyAndLocked()
    obj.doJobAndRelease()
于 2013-11-27T08:51:08.340 に答える