私のプロジェクトで非常に重要な役割を果たしているキュー オブジェクトを持っていますが、それにバグがあるのは許せません。
その考え方は、その基本クラスである builtin に似ていQueue
ますが、メモリを保持するために、データまたはその一部を少なくともファイルに保存します。これにより速度が上がるはずなので、一部をメモリに保持することにしました。ここにコードを置いておきます。説明するよりも見た方が簡単かもしれません
やりたいことは奇妙に思えるかもしれませんが、多くの作業をキューに入れる必要があり、それを処理するよりもはるかに速くキューに入れる必要があり、標準のQueue
. Queue
できるだけ早く処理するデータの総量を知りたいので、最大サイズを設定してワーカーがキューに物を入れるのをブロックすることはできません。Queue
また、最初に合計を計算することはできませんが、データを調べるたびに合計が異なり、最後に合計が一致しないため、それをキューに入れずに戻って入力することはできません。
私の質問は、これを徹底的にテストして、アイテムがまだバッファまたはファイルにある場合、または complete が呼び出されてキューが空の場合に、アイテムが失われたり、さらに重要なことにゲッターでブロックされていないことを確認するにはどうすればよいかということです。
特定の入力に対して出力がどうあるべきかがわかっている場合、ユニットテストをテストしてセットアップするのが非常に簡単に見えるものもありますが、このようなものをテストする効果的な方法はよくわかりません。この種のことを単体テストでテストすることは可能ですか?
さまざまな数のアイテムのさまざまな速度でアイテムを入れたり取得したりするテスト プログラムをセットアップしましたが、問題ないように見えますが、.get
まだキューにあるアイテムでゲッターがブロックされているという証拠を見たので、私は問題があると信じるようになります。
これを徹底的にテストして残りのバグを見つけたり、バグがないことをほぼ確実にする最善の方法は何ですか?
編集
次のコードで使用するものと同様のテストデータを生成できます。プロジェクトの特定の条件下でのファイルのチェックサムしかありません。それ以外の場合はNone
、以下のコードで時々生成して試してみますそれをシミュレートする
import os
import hashlib
def hash(f_obj):
md5 = hashlib.md5()
while True:
data = f_obj.read(8192)
if not data:
break
md5.update(data)
return md5.hexdigest()
def produce(at_once,total_items):
items=[]
count=0
for dir,folders,files in os.walk("/"):
for f in files:
try:
f_path= os.path.join(dir,f)
f_size= os.path.getsize(f_path)
f_mtime= os.path.getmtime(f_path)
with open(f_path) as file_obj:
f_hash= hash(file_obj) if f_size%2 else None
items.append((f_path,f_size,f_mtime,f_hash))
count+=1
except Exception as err:
print "#####",err,"#####"
if len(items) >= at_once:
yield items
items=[]
if count >= total_items:
break
if items:
yield items