あなたがどのライブラリを使用しているのか正確にはわかりません.「これはキューを構築しています...」と言うとき、あなたが何を指しているのかわかりません..使用しているものの前にあるので、そのキューを直接操作できます。例えば:
import queue
import threading
def skip_get(q):
value = q.get(block=True)
try:
while True:
value = q.get(block=False)
except queue.Empty:
return value
q = queue.Queue()
def file_event_callback(event):
# code 256 for adding file to folder
if event.mask == 256:
fileChanged = event.name
q.put(fileChanged)
def consumer():
while True:
fileChanged = skip_get(q)
if fileChanged is None:
return
# do stuff with fileChanged
オブザーバーを起動する前に、次のようにします。
t = threading.Thread(target=consumer)
t.start()
そして最後に:
observer.join()
q.put(None)
t.join()
それで、これはどのように機能しますか?
まず、消費者側を見てみましょう。を呼び出すとq.get()
、キューから最初のものをポップします。しかし、何もない場合はどうなりますか?それがそのためのblock
議論です。false の場合get
、queue.Empty
例外が発生します。true の場合、get
何かがポップされるように見えるまで (スレッドセーフな方法で) 永久に待機します。したがって、一度ブロックすることで、まだ読み取るものが何もない場合を処理します。次に、ブロックせずにループすることで、キューにある他のものをすべて消費して、読み取るものが多すぎる場合に対処します。ポップしたものは何でも再割り当てし続けるためvalue
、最終的にキューに入れられるのは最後になります。
ここで、プロデューサー側を見てみましょう。を呼び出すと、キューq.put(value)
に入れられます。value
キューにサイズ制限を設定していない限り (私は設定していません)、これがブロックされることはありません。そのため、そのことについて心配する必要はありません。しかし、ここで、コンシューマ スレッドに終了したことをどのように知らせるのでしょうか? q.get(block=True)
それは永遠に待っています。それを目覚めさせる唯一の方法は、ポップする値を与えることです。センチネル値をプッシュし (この場合None
は、ファイル名として有効ではないため問題ありません)、コンシューマーがNone
終了することでそれを処理できるようにすることで、適切でクリーンなシャットダウン方法を自分自身に提供します。(そして、None
の後に何もプッシュしないため、誤ってスキップする可能性はありません。) したがって、単にプッシュすることができます。None
、(他のバグがなければ) コンシューマ スレッドが最終的に終了することを確認してください。つまりt.join()
、デッドロックを恐れずに終了するまで待つことができます。
.を使用すると、これをより簡単に実行できると上で述べましたCondition
。キューが実際にどのように機能するかを考えると、それは条件によって保護された単なるリスト (または deque など) です。コンシューマーは、何かが利用可能になるまで条件を待ち、プロデューサーは何かをリストに追加して利用可能にします。状態を知らせます。最後の値だけが必要な場合は、リストを作成する理由はありません。したがって、これを行うことができます:
class OneQueue(object):
def __init__(self):
self.value = None
self.condition = threading.Condition()
self.sentinel = object()
def get(self):
with self.condition:
while self.value is None:
self.condition.wait()
value, self.value = self.value, None
return value
def put(self, value):
with self.condition:
self.value = value
self.condition.notify()
def close(self):
self.put(self.sentinel)
(現在None
、何も利用できないことを知らせるために を使用しているため、完了したことを知らせる別の番兵を作成する必要がありました。)
この設計の問題点は、コンシューマーが忙しすぎて処理できないときにプロデューサーが複数の値を入力すると、それらの一部を見逃す可能性があることですが、この場合、その「問題」はまさにあなたが探していたものです。
それでも、低レベルのツールを使用するということは、常に間違いが多いことを意味します。これは、スレッド化の同期では特に危険です。なぜなら、頭を包み込むのが難しく、理解している場合でもデバッグするのが難しい問題が含まれているためです。とにかくa を使用したほうがよいかもしれませQueue
ん。