371

変更を監視したい別のプロセスによって書き込まれているログファイルがあります。変更が発生するたびに、新しいデータを読み込んで処理を行いたいと思います。

これを行うための最良の方法は何ですか?PyWin32ライブラリから何らかのフックがあることを期待していました。win32file.FindNextChangeNotification関数を見つけましたが、特定のファイルを監視するように要求する方法がわかりません。

誰かがこのようなことをしたなら、私はその方法を聞いて本当に感謝しています...

[編集]私はポーリングを必要としない解決策を求めていたと述べるべきでした。

[編集]呪い!これは、マップされたネットワークドライブでは機能しないようです。Windowsは、ローカルディスクの場合とは異なり、ファイルの更新を「聞き取れない」と思います。

4

28 に答える 28

311

Watchdogを使ってみましたか?

ファイル システム イベントを監視するための Python API ライブラリとシェル ユーティリティ。

ディレクトリ監視を簡単に

  • クロスプラットフォーム API。
  • ディレクトリの変更に応じてコマンドを実行するシェル ツール。

クイックスタートの簡単な例からすぐに始めましょう...

于 2011-01-14T11:52:26.400 に答える
114

ポーリングで十分な場合は、「変更時間」ファイルの統計が変化するかどうかを監視します。それを読むには:

os.stat(filename).st_mtime

(また、Windows ネイティブの変更イベント ソリューションは、ネットワーク ドライブなど、すべての状況で機能するとは限らないことに注意してください。)

import os

class Monkey(object):
    def __init__(self):
        self._cached_stamp = 0
        self.filename = '/path/to/file'

    def ook(self):
        stamp = os.stat(self.filename).st_mtime
        if stamp != self._cached_stamp:
            self._cached_stamp = stamp
            # File has changed, so do something...
于 2008-10-08T11:34:14.850 に答える
81

http://timgolden.me.uk/python/win32_how_do_i/watch_directory_for_changes.htmlで入手できるドキュメントはもうご覧になりましたか? Windowsでのみ動作する必要がある場合、2番目の例はまさにあなたが望むものと思われます(ディレクトリのパスを監視したいファイルの1つと交換する場合)。

それ以外の場合、ポーリングはおそらくプラットフォームに依存しない唯一のオプションになります。

注:これらの解決策は試していません。

于 2008-10-08T11:29:43.330 に答える
53

マルチプラットフォーム ソリューションが必要な場合は、QFileSystemWatcherを確認してください。ここにコード例があります(サニタイズされていません):

from PyQt4 import QtCore

@QtCore.pyqtSlot(str)
def directory_changed(path):
    print('Directory Changed!!!')

@QtCore.pyqtSlot(str)
def file_changed(path):
    print('File Changed!!!')

fs_watcher = QtCore.QFileSystemWatcher(['/path/to/files_1', '/path/to/files_2', '/path/to/files_3'])

fs_watcher.connect(fs_watcher, QtCore.SIGNAL('directoryChanged(QString)'), directory_changed)
fs_watcher.connect(fs_watcher, QtCore.SIGNAL('fileChanged(QString)'), file_changed)
于 2011-03-17T13:45:31.360 に答える
32

Windows では動作しないはずですが (おそらく cygwin でしょうか?)、UNIX ユーザーの場合は「fcntl」システム コールを使用する必要があります。これは Python での例です。Cで書く必要がある場合は、ほとんど同じコードです(同じ関数名)

import time
import fcntl
import os
import signal

FNAME = "/HOME/TOTO/FILETOWATCH"

def handler(signum, frame):
    print "File %s modified" % (FNAME,)

signal.signal(signal.SIGIO, handler)
fd = os.open(FNAME,  os.O_RDONLY)
fcntl.fcntl(fd, fcntl.F_SETSIG, 0)
fcntl.fcntl(fd, fcntl.F_NOTIFY,
            fcntl.DN_MODIFY | fcntl.DN_CREATE | fcntl.DN_MULTISHOT)

while True:
    time.sleep(10000)
于 2009-01-23T16:08:40.207 に答える
20

pyinotifyをチェックしてください。

inotify は、新しい Linux の dnotify (以前の回答から) を置き換え、ディレクトリ レベルではなくファイル レベルの監視を可能にします。

于 2010-06-13T05:12:49.907 に答える
13

Tim Golden のスクリプトを少しハッキングした後、非常にうまく機能しているように見える次のものができました。

import os

import win32file
import win32con

path_to_watch = "." # look at the current directory
file_to_watch = "test.txt" # look for changes to a file called test.txt

def ProcessNewData( newData ):
    print "Text added: %s"%newData

# Set up the bits we'll need for output
ACTIONS = {
  1 : "Created",
  2 : "Deleted",
  3 : "Updated",
  4 : "Renamed from something",
  5 : "Renamed to something"
}
FILE_LIST_DIRECTORY = 0x0001
hDir = win32file.CreateFile (
  path_to_watch,
  FILE_LIST_DIRECTORY,
  win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
  None,
  win32con.OPEN_EXISTING,
  win32con.FILE_FLAG_BACKUP_SEMANTICS,
  None
)

# Open the file we're interested in
a = open(file_to_watch, "r")

# Throw away any exising log data
a.read()

# Wait for new data and call ProcessNewData for each new chunk that's written
while 1:
  # Wait for a change to occur
  results = win32file.ReadDirectoryChangesW (
    hDir,
    1024,
    False,
    win32con.FILE_NOTIFY_CHANGE_LAST_WRITE,
    None,
    None
  )

  # For each change, check to see if it's updating the file we're interested in
  for action, file in results:
    full_filename = os.path.join (path_to_watch, file)
    #print file, ACTIONS.get (action, "Unknown")
    if file == file_to_watch:
        newText = a.read()
        if newText != "":
            ProcessNewData( newText )

エラーチェックをロードすることでおそらく可能ですが、ログファイルを監視して、画面に出力する前に何らかの処理を行うだけであれば、これはうまく機能します。

ご意見をお寄せいただきありがとうございます。すばらしい内容です。

于 2008-10-08T14:05:53.203 に答える
12

ポーリングと最小限の依存関係を使用して単一のファイルを監視するために、 Deestanからの回答(上記)に基づいて、完全に肉付けされた例を次に示します。

import os
import sys 
import time

class Watcher(object):
    running = True
    refresh_delay_secs = 1

    # Constructor
    def __init__(self, watch_file, call_func_on_change=None, *args, **kwargs):
        self._cached_stamp = 0
        self.filename = watch_file
        self.call_func_on_change = call_func_on_change
        self.args = args
        self.kwargs = kwargs

    # Look for changes
    def look(self):
        stamp = os.stat(self.filename).st_mtime
        if stamp != self._cached_stamp:
            self._cached_stamp = stamp
            # File has changed, so do something...
            print('File changed')
            if self.call_func_on_change is not None:
                self.call_func_on_change(*self.args, **self.kwargs)

    # Keep watching in a loop        
    def watch(self):
        while self.running: 
            try: 
                # Look for changes
                time.sleep(self.refresh_delay_secs) 
                self.look() 
            except KeyboardInterrupt: 
                print('\nDone') 
                break 
            except FileNotFoundError:
                # Action on file not found
                pass
            except: 
                print('Unhandled error: %s' % sys.exc_info()[0])

# Call this function each time a change happens
def custom_action(text):
    print(text)

watch_file = 'my_file.txt'

# watcher = Watcher(watch_file)  # simple
watcher = Watcher(watch_file, custom_action, text='yes, changed')  # also call custom action function
watcher.watch()  # start the watch going
于 2018-02-27T11:30:25.837 に答える
9

同様の質問に対する私の回答を確認してください。Python で同じループを試すことができます。このページでは次のことを提案しています。

import time

while 1:
    where = file.tell()
    line = file.readline()
    if not line:
        time.sleep(1)
        file.seek(where)
    else:
        print line, # already has newline

質問tail() a file with Pythonも参照してください。

于 2008-10-08T11:28:12.243 に答える
8

これは、UNIXタイプで実行され、dict(file => time)を使用してファイルを変更するための単純なウォッチャーを追加するTimGoldanのスクリプトの別の変更です。

使用法:whateverName.py path_to_dir_to_watch

#!/usr/bin/env python

import os, sys, time

def files_to_timestamp(path):
    files = [os.path.join(path, f) for f in os.listdir(path)]
    return dict ([(f, os.path.getmtime(f)) for f in files])

if __name__ == "__main__":

    path_to_watch = sys.argv[1]
    print('Watching {}..'.format(path_to_watch))

    before = files_to_timestamp(path_to_watch)

    while 1:
        time.sleep (2)
        after = files_to_timestamp(path_to_watch)

        added = [f for f in after.keys() if not f in before.keys()]
        removed = [f for f in before.keys() if not f in after.keys()]
        modified = []

        for f in before.keys():
            if not f in removed:
                if os.path.getmtime(f) != before.get(f):
                    modified.append(f)

        if added: print('Added: {}'.format(', '.join(added)))
        if removed: print('Removed: {}'.format(', '.join(removed)))
        if modified: print('Modified: {}'.format(', '.join(modified)))

        before = after
于 2013-02-25T16:05:07.080 に答える
8

これは、同じトリックを実行しているように見え、ファイル全体をインポートしないように見える Kender のコードの簡略化されたバージョンです。

# Check file for new data.

import time

f = open(r'c:\temp\test.txt', 'r')

while True:

    line = f.readline()
    if not line:
        time.sleep(1)
        print 'Nothing New'
    else:
        print 'Call Function: ', line
于 2009-12-08T16:09:42.800 に答える
7

Pythonを使用しているので、ファイルを開いてそのファイルから行を読み続けることができます。

f = open('file.log')

読み取った行がでない場合は、それを処理します。

line = f.readline()
if line:
    // Do what you want with the line

readlineEOFに電話をかけ続けても大丈夫だということを見逃しているかもしれません。この場合、空の文字列を返し続けます。また、ログファイルに何かが追加されると、必要に応じて、停止した場所から読み取りが続行されます。

イベントまたは特定のライブラリを使用するソリューションを探している場合は、質問でこれを指定してください。そうでなければ、この解決策は問題ないと思います。

于 2008-10-08T12:18:25.980 に答える
4

Horst Gutmannが指摘したTim Golden の記事でわかるように、WIN32 は比較的複雑で、単一のファイルではなくディレクトリを監視します。

.NET Python 実装であるIronPythonを調べることをお勧めします。IronPython を使用すると、すべての.NET機能を使用できます。

System.IO.FileSystemWatcher

シンプルなEventインターフェイスで単一のファイルを処理します。

于 2008-10-08T11:42:12.247 に答える
2

これは、ファイルの変更をチェックする例です。それを行うための最良の方法ではないかもしれませんが、それは確かに短い方法です.

ソースに変更が加えられたときにアプリケーションを再起動するための便利なツール。これは pygame で遊んでいるときに作成したもので、ファイルを保存した直後に効果が発生するのを確認できます。

pygame で使用する場合は、'while' ループ内の要素がゲーム ループ (更新など) に配置されていることを確認してください。そうしないと、アプリケーションが無限ループに陥り、ゲームの更新が見られなくなります。

file_size_stored = os.stat('neuron.py').st_size

  while True:
    try:
      file_size_current = os.stat('neuron.py').st_size
      if file_size_stored != file_size_current:
        restart_program()
    except: 
      pass

ウェブで見つけた再起動コードが必要な場合。ここにあります。(質問には関係ありませんが、便利になる可能性があります)

def restart_program(): #restart application
    python = sys.executable
    os.execl(python, python, * sys.argv)

電子を自分のやりたいように動かして楽しんでください。

于 2014-04-20T11:11:19.840 に答える
2
ACTIONS = {
  1 : "Created",
  2 : "Deleted",
  3 : "Updated",
  4 : "Renamed from something",
  5 : "Renamed to something"
}
FILE_LIST_DIRECTORY = 0x0001

class myThread (threading.Thread):
    def __init__(self, threadID, fileName, directory, origin):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.fileName = fileName
        self.daemon = True
        self.dir = directory
        self.originalFile = origin
    def run(self):
        startMonitor(self.fileName, self.dir, self.originalFile)

def startMonitor(fileMonitoring,dirPath,originalFile):
    hDir = win32file.CreateFile (
        dirPath,
        FILE_LIST_DIRECTORY,
        win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
        None,
        win32con.OPEN_EXISTING,
        win32con.FILE_FLAG_BACKUP_SEMANTICS,
        None
    )
    # Wait for new data and call ProcessNewData for each new chunk that's
    # written
    while 1:
        # Wait for a change to occur
        results = win32file.ReadDirectoryChangesW (
            hDir,
            1024,
            False,
            win32con.FILE_NOTIFY_CHANGE_LAST_WRITE,
            None,
            None
        )
        # For each change, check to see if it's updating the file we're
        # interested in
        for action, file_M in results:
            full_filename = os.path.join (dirPath, file_M)
            #print file, ACTIONS.get (action, "Unknown")
            if len(full_filename) == len(fileMonitoring) and action == 3:
                #copy to main file
                ...
于 2013-09-22T18:40:48.793 に答える
1

これは、1 秒あたり 1 行しか書き込みませんが、通常ははるかに少ない入力ファイルを監視するための例です。目標は、指定された出力ファイルに最後の行 (最新の書き込み) を追加することです。これを自分のプロジェクトの 1 つからコピーし、無関係な行をすべて削除しました。不足している記号を入力または変更する必要があります。

from PyQt5.QtCore import QFileSystemWatcher, QSettings, QThread
from ui_main_window import Ui_MainWindow   # Qt Creator gen'd 

class MainWindow(QMainWindow, Ui_MainWindow):
    def __init__(self, parent=None):
        QMainWindow.__init__(self, parent)
        Ui_MainWindow.__init__(self)
        self._fileWatcher = QFileSystemWatcher()
        self._fileWatcher.fileChanged.connect(self.fileChanged)

    def fileChanged(self, filepath):
        QThread.msleep(300)    # Reqd on some machines, give chance for write to complete
        # ^^ About to test this, may need more sophisticated solution
        with open(filepath) as file:
            lastLine = list(file)[-1]
        destPath = self._filemap[filepath]['dest file']
        with open(destPath, 'a') as out_file:               # a= append
            out_file.writelines([lastLine])

もちろん、包含 QMainWindow クラスは厳密には必須ではありません。QFileSystemWatcher を単独で使用できます。

于 2016-09-21T01:53:44.947 に答える
0
import inotify.adapters
from datetime import datetime


LOG_FILE='/var/log/mysql/server_audit.log'


def main():
    start_time = datetime.now()
    while True:
        i = inotify.adapters.Inotify()
        i.add_watch(LOG_FILE)
        for event in i.event_gen(yield_nones=False):
            break
        del i

        with open(LOG_FILE, 'r') as f:
            for line in f:
                entry = line.split(',')
                entry_time = datetime.strptime(entry[0],
                                               '%Y%m%d %H:%M:%S')
                if entry_time > start_time:
                    start_time = entry_time
                    print(entry)


if __name__ == '__main__':
    main()
于 2021-05-29T20:49:46.350 に答える
-4

Windows固有の機能はわかりません。ファイルの MD5 ハッシュを毎秒/分/時間 (必要な速度によって異なります) 取得して、最後のハッシュと比較することができます。異なる場合は、ファイルが変更されていることがわかり、最新の行を読み上げます。

于 2008-10-08T11:25:57.270 に答える
-6

私はこのようなものを試してみます。

    try:
            f = open(filePath)
    except IOError:
            print "No such file: %s" % filePath
            raw_input("Press Enter to close window")
    try:
            lines = f.readlines()
            while True:
                    line = f.readline()
                    try:
                            if not line:
                                    time.sleep(1)
                            else:
                                    functionThatAnalisesTheLine(line)
                    except Exception, e:
                            # handle the exception somehow (for example, log the trace) and raise the same exception again
                            raw_input("Press Enter to close window")
                            raise e
    finally:
            f.close()

ループは、最後にファイルが読み取られてから新しい行があるかどうかをチェックします。ある場合は、読み取られてfunctionThatAnalisesTheLine関数に渡されます。そうでない場合、スクリプトは 1 秒間待ってからプロセスを再試行します。

于 2008-10-08T11:26:44.960 に答える