17

現在、単一のディレクトリとその下のすべてのサブディレクトリの変更を監視し、出力を LoggingEventHandler に渡す基本的な機能スクリプトがあります。

スクリプトを拡張して 3 つの別々の場所を監視したいのですが、複数のオブザーバーを生成して、指定した各パスを監視する方法を理解できません。

私は次の行に沿って何かを試みました:

import time
import thread
import threading
from watchdog.observers import Observer
from watchdog.events import LoggingEventHandler

event_handler = LoggingEventHandler()
observer = Observer()

paths = ["C:\dir1", "C:\dir2", "C:\dir3"]

for i in paths:
    targetPath = str(i)
    observer.schedule(event_handler, targetPath, recursive=True)
    observer.start_new_thread()

残念ながら、オブザーバーに属性「start_new_thread」がないことを示すエラーを受け取りました

ディレクトリを監視する複数のオブザーバーを示すドキュメント内の例はありません。私はスレッドを扱った経験がなく、正しい軌道に乗っているかどうかさえわかりません。

代わりに、パスごとにオブザーバー クラスの新しいインスタンスを作成する必要がありますか? または、Observer クラスの単一のインスタンス、複数のパスを供給する方法はありますか?

明白な答えがある場合はお詫び申し上げます。これは完全に間違っていると確信しています。疲れすぎて理解できません。

追加:

@FogleBird のおかげで、スレッド開始の問題は修正されましたが、3 つの別々のオブザーバーが異なるパスを監視するのではなく、1 つのインスタンスしか残っていません。私の変更されたコードは次のようになります。

threads = []

for i in paths:
    targetPath = str(i)
    observer.schedule(event_handler, targetPath, recursive=True)
    threads.append(observer)

observer.start()
print threads

これは 3 つの ObservedWatch オブジェクトを返しますが、詳細はすべて同じです。

[<Observer(Thread-1, started daemon 1548)>, <Observer(Thread-1, started daemon 1548)>, <Observer(Thread-1, started daemon 1548)>]

まだ完全に間違っているように見えますが、これ以上の助けは素晴らしいでしょう。私はこの概念を理解するのに苦労しています。

追加 2:

私はコードをいじり続けましたが、今では機能しているように見えるものがあります:

event_handler = LoggingEventHandler()
N2watch = Observer()
threads = []

for i in paths:
    targetPath = str(i)
    N2watch.schedule(event_handler, targetPath, recursive=True)
    threads.append(N2watch)

N2watch.start()

try:
    while True:
            time.sleep(1)
except KeyboardInterrupt:
    N2watch.stop()
N2watch.join()

最初の実行から収集できるものから、リストで指定された 3 つのパス名すべての変更が出力に反映されているように見えましたが、確認するためにいくつかのテスト コードを記述する必要があります。

これがどのように動作するかはまだわからないので、さらにコメントをいただければ幸いです。

乾杯。

追加 3:

FogleBird の回答は、単に唯一のものであり、最初のコードの問題を強調していたため、最良の回答としてマークしました。

私の以前の編集には、複数の場所を監視するための完全に機能するコードが含まれており、現在は正しく機能しているようです。

4

4 に答える 4

9

素晴らしい質問です。このスレッドは古いですが、正確なものを調べているときに見つけたので、作業を拡張し、監視するディレクトリのリストを含むファイルを渡す機能を追加しました. デフォルトでは、再帰的に調べません。テストは他の人に任せます。うまくいけば、これは誰でも同じトピックを検索するのに役立ちます. すごい仕事!

Pythonwatcher.pyファイル名を使用して実行します。

私がスクリプトと呼んだものはどこwatcher.pyにあり、ファイル名はパスを含むファイルの名前です。

ファイル内の完全なパスをリストし、これらは改行で区切られています。

すなわち:

C:\path1
C:\Path2\subpath1
C:\PATH3

watcher.py

import logging
import sys
import time
from watchdog.observers import Observer
from watchdog.events import LoggingEventHandler

# Attach a logging event AKA FileSystemEventHandler
event_handler = LoggingEventHandler()

# Create Observer to watch directories
observer = Observer()

# Take in list of paths. If none given, watch CWD
paths = open(sys.argv[1], 'r') if len(sys.argv) > 1 else '.'

# Empty list of observers
observers = []

# Base logging configuration
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S')

# Iterate through paths and attach observers
for line in paths:

    # Convert line into string and strip newline character
    targetPath = str(line).rstrip()

    # Schedules watching of a given path
    observer.schedule(event_handler, targetPath)

    # Add observable to list of observers
    observers.append(observer)

# Start observer
observer.start()

try:
    while True:

        # Poll every second
        time.sleep(1)

except KeyboardInterrupt:
    for o in observers:
        o.unschedule_all()

        # Stop observer if interrupted
        o.stop()

for o in observers:

    # Wait until the thread terminates before exit
    o.join()
于 2016-09-07T15:27:18.633 に答える
5

いくつかのメモを追加したいだけです:

コード内のスレッド ライブラリとスレッド リストは、watchdog を使い始めたばかりの人 (私を含む) にとっては少し混乱する可能性があります。それらは実際にはソリューションには必要ありません。それを説明する簡単な方法は次のとおりです。

  • 1 つのオブザーバーを作成する
  • 複数の「視聴イベント」をスケジュールする
  • オブザーバーを起動します。

それでおしまい。

于 2016-10-12T22:43:31.473 に答える
1

複数のディレクトリを監視するために使用するコードを次に示します。

import sys
import time
import logging
from watchdog.observers.polling import PollingObserver as Observer
from watchdog.events import LoggingEventHandler

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO,
                        format='%(asctime)s - %(message)s',
                        datefmt='%Y-%m-%d %H:%M:%S')
    event_handler = LoggingEventHandler()
    observer = Observer()
    if len(sys.argv) > 1:
        for i in range(1, len(sys.argv)):
            observer.schedule(event_handler, sys.argv[i], recursive=True)
    else:
        observer.schedule(event_handler, '.', recursive=True)
    observer.start()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()
于 2019-03-13T23:22:48.443 に答える