1

現在、fsevents を使用してフォルダーを監視しています。ファイルが追加されるたびに、このファイルに対してコードが実行されます。毎秒新しいファイルがフォルダーに追加されます。

from fsevents import Observer, Stream

def file_event_callback(event):
    # code 256 for adding file to folder
    if event.mask == 256:
        fileChanged = event.name
        # do stuff with fileChanged file

if __name__ == "__main__":
    observer = Observer()   
    observer.start()
    stream = Stream(file_event_callback, 'folder', file_events=True)
    observer.schedule(stream)
    observer.join()

これは非常にうまく機能します。唯一の問題は、ライブラリがフォルダーに追加されたすべてのファイルのキューを作成していることです。file_event_callback 内で実行されるコードは、1 秒以上かかる場合があります。その場合、キュー内の他のアイテムはスキップして、最新のものだけが使用されるようにする必要があります。

最後のアイテムの後に使用されるフォルダーへの最新の追加のみが終了するように、キューからアイテムをスキップするにはどうすればよいですか?

最初にウォッチドッグを使用してみましたが、これは Mac で実行する必要があるため、思いどおりに動作させるのに苦労しました。

4

1 に答える 1

0

あなたがどのライブラリを使用しているのか正確にはわかりません.「これはキューを構築しています...」と言うとき、あなたが何を指しているのかわかりません..使用しているものの前にあるので、そのキューを直接操作できます。例えば:

import queue
import threading

def skip_get(q):
    value = q.get(block=True)
    try:
        while True:
            value = q.get(block=False)
    except queue.Empty:
        return value

q = queue.Queue()

def file_event_callback(event):
    # code 256 for adding file to folder
    if event.mask == 256:
        fileChanged = event.name
        q.put(fileChanged)

def consumer():
    while True:
        fileChanged = skip_get(q)
        if fileChanged is None:
            return
        # do stuff with fileChanged

オブザーバーを起動する前に、次のようにします。

t = threading.Thread(target=consumer)
t.start()

そして最後に:

observer.join()
q.put(None)
t.join()

それで、これはどのように機能しますか?

まず、消費者側を見てみましょう。を呼び出すとq.get()、キューから最初のものをポップします。しかし、何もない場合はどうなりますか?それがそのためのblock議論です。false の場合getqueue.Empty例外が発生します。true の場合、get何かがポップされるように見えるまで (スレッドセーフな方法で) 永久に待機します。したがって、一度ブロックすることで、まだ読み取るものが何もない場合を処理します。次に、ブロックせずにループすることで、キューにある他のものをすべて消費して、読み取るものが多すぎる場合に対処します。ポップしたものは何でも再割り当てし続けるためvalue、最終的にキューに入れられるのは最後になります。

ここで、プロデューサー側を見てみましょう。を呼び出すと、キューq.put(value)に入れられます。valueキューにサイズ制限を設定していない限り (私は設定していません)、これがブロックされることはありません。そのため、そのことについて心配する必要はありません。しかし、ここで、コンシューマ スレッドに終了したことをどのように知らせるのでしょうか? q.get(block=True)それは永遠に待っています。それを目覚めさせる唯一の方法は、ポップする値を与えることです。センチネル値をプッシュし (この場合Noneは、ファイル名として有効ではないため問題ありません)、コンシューマーがNone終了することでそれを処理できるようにすることで、適切でクリーンなシャットダウン方法を自分自身に提供します。(そして、Noneの後に何もプッシュしないため、誤ってスキップする可能性はありません。) したがって、単にプッシュすることができます。None、(他のバグがなければ) コンシューマ スレッドが最終的に終了することを確認してください。つまりt.join()、デッドロックを恐れずに終了するまで待つことができます。


.を使用すると、これをより簡単に実行できると上で述べましたCondition。キューが実際にどのように機能するかを考えると、それは条件によって保護された単なるリスト (または deque など) です。コンシューマーは、何かが利用可能になるまで条件を待ち、プロデューサーは何かをリストに追加して利用可能にします。状態を知らせます。最後の値だけが必要な場合は、リストを作成する理由はありません。したがって、これを行うことができます:

class OneQueue(object):
    def __init__(self):
        self.value = None
        self.condition = threading.Condition()
        self.sentinel = object()
    def get(self):
        with self.condition:
            while self.value is None:
                self.condition.wait()
            value, self.value = self.value, None
            return value
    def put(self, value):
        with self.condition:
            self.value = value
            self.condition.notify()
    def close(self):
        self.put(self.sentinel)

(現在None、何も利用できないことを知らせるために を使用しているため、完了したことを知らせる別の番兵を作成する必要がありました。)

この設計の問題点は、コンシューマーが忙しすぎて処理できないときにプロデューサーが複数の値を入力すると、それらの一部を見逃す可能性があることですが、この場合、その「問題」はまさにあなたが探していたものです。

それでも、低レベルのツールを使用するということは、常に間違いが多いことを意味します。これは、スレッド化の同期では特に危険です。なぜなら、頭を包み込むのが難しく、理解している場合でもデバッグするのが難しい問題が含まれているためです。とにかくa を使用したほうがよいかもしれませQueueん。

于 2014-09-23T06:08:20.507 に答える