環境
私は最近、Code Review のレビュー用にタイマー クラスを投稿しました。1 つの単体テストが失敗するのを見たことがあるので、並行性のバグがあると直観していましたが、失敗を再現できませんでした。したがって、コードレビューへの私の投稿。
コード内のさまざまな競合状態を強調する素晴らしいフィードバックをいくつか受け取りました。(と思った)問題と解決策は理解しましたが、修正を行う前に、単体テストでバグを公開したかったのです。やってみると、難しいと思いました。さまざまなスタック交換の回答は、スレッドの実行を制御してバグを公開する必要があることを示唆しており、不自然なタイミングは必ずしも別のマシンに移植できるとは限りません。これは、私が解決しようとしていた問題を超えた多くの偶発的な複雑さのように思えました。
代わりに、python 用の最高の静的解析 (SA) ツールである PyLint を使用して、バグを見つけられるかどうかを確認しようとしましたが、できませんでした。人間はコード レビュー (本質的には SA) を通じてバグを見つけることができたのに、SA ツールでは見つけられなかったのはなぜですか?
Valgrind を Python で動作させることを恐れて(ヤクの毛刈りのように聞こえました)、最初にバグを再現せずにバグを修正することにしました。今、私はピクルスにいます。
これが今のコードです。
from threading import Timer, Lock
from time import time
class NotRunningError(Exception): pass
class AlreadyRunningError(Exception): pass
class KitchenTimer(object):
'''
Loosely models a clockwork kitchen timer with the following differences:
You can start the timer with arbitrary duration (e.g. 1.2 seconds).
The timer calls back a given function when time's up.
Querying the time remaining has 0.1 second accuracy.
'''
PRECISION_NUM_DECIMAL_PLACES = 1
RUNNING = "RUNNING"
STOPPED = "STOPPED"
TIMEUP = "TIMEUP"
def __init__(self):
self._stateLock = Lock()
with self._stateLock:
self._state = self.STOPPED
self._timeRemaining = 0
def start(self, duration=1, whenTimeup=None):
'''
Starts the timer to count down from the given duration and call whenTimeup when time's up.
'''
with self._stateLock:
if self.isRunning():
raise AlreadyRunningError
else:
self._state = self.RUNNING
self.duration = duration
self._userWhenTimeup = whenTimeup
self._startTime = time()
self._timer = Timer(duration, self._whenTimeup)
self._timer.start()
def stop(self):
'''
Stops the timer, preventing whenTimeup callback.
'''
with self._stateLock:
if self.isRunning():
self._timer.cancel()
self._state = self.STOPPED
self._timeRemaining = self.duration - self._elapsedTime()
else:
raise NotRunningError()
def isRunning(self):
return self._state == self.RUNNING
def isStopped(self):
return self._state == self.STOPPED
def isTimeup(self):
return self._state == self.TIMEUP
@property
def timeRemaining(self):
if self.isRunning():
self._timeRemaining = self.duration - self._elapsedTime()
return round(self._timeRemaining, self.PRECISION_NUM_DECIMAL_PLACES)
def _whenTimeup(self):
with self._stateLock:
self._state = self.TIMEUP
self._timeRemaining = 0
if callable(self._userWhenTimeup):
self._userWhenTimeup()
def _elapsedTime(self):
return time() - self._startTime
質問
このコード例のコンテキストで、競合状態を明らかにし、修正し、修正されたことを証明するにはどうすればよいでしょうか?
エクストラポイント
特にこのコードではなく、他の実装や問題に適したテスト フレームワークの追加ポイント。
取り除く
私の結論は、特定された競合状態を再現するための技術的な解決策は、2 つのスレッドの同期を制御して、バグが発生する順序で確実に実行されるようにすることです。ここで重要な点は、それらがすでに特定された競合状態であるということです。競合状態を特定する最善の方法は、コードをコード レビューにかけ、より多くの専門家にコードを分析してもらうことです。