私は、(うまくいけば)任意のファイルを解析し、解析されたコンテンツに対して任意のアクションを実行できるファイルプロセッサを作成しています。ファイルプロセッサは継続的に実行する必要があります。私がフォローしている基本的な考え方は
- 各ファイルには、2つの関連するプロセスがあります(1つは読み取り用、もう1つは解析と別の場所への書き込み用)
Queue
リーダーは、EOFまたはバッファーがいっぱいになるまで、共通のバッファー(たとえばa)に行を読み取ります。それから待つ(寝る)- ライターはバッファーから読み取り、解析し、バッファーが空でなくなるまで(たとえば)DBに書き込みます。それから待つ(寝る)
- メインプログラムを中断すると、リーダー/ライターは安全に終了します(バッファーは書き込みなしで洗い流される可能性があります)
プログラムは正常に実行されます。ただし、Writerが最初に初期化して、バッファが空であることを検出する場合があります。だからそれは眠りにつくでしょう。リーダーはバッファーをいっぱいにしてスリープします。したがって、sleep_interval
私のコードでは何もしません。これを回避するために、aを使用multiprocessing.Event()
して、バッファに処理可能なエントリがいくつかあることをライターに通知してみました。
私のコードは
import multiprocessing
import time
import sys
import signal
import Queue
class FReader(multiprocessing.Process):
"""
A basic file reader class
It spawns a new process that shares a queue with the writer process
"""
def __init__(self,queue,fp,sleep_interval,read_offset,event):
self.queue = queue
self.fp = fp
self.sleep_interval = sleep_interval
self.offset = read_offset
self.fp.seek(self.offset)
self.event = event
self.event.clear()
super(FReader,self).__init__()
def myhandler(self,signum,frame):
self.fp.close()
print "Stopping Reader"
sys.exit(0)
def run(self):
signal.signal(signal.SIGINT,self.myhandler)
signal.signal(signal.SIGCLD,signal.SIG_DFL)
signal.signal(signal.SIGILL,self.myhandler)
while True:
sleep_now = False
if not self.queue.full():
print "READER:Reading"
m = self.fp.readline()
if not self.event.is_set():
self.event.set()
if m:
self.queue.put((m,self.fp.tell()),block=False)
else:
sleep_now = True
else:
print "Queue Full"
sleep_now = True
if sleep_now:
print "Reader sleeping for %d seconds"%self.sleep_interval
time.sleep(self.sleep_interval)
class FWriter(multiprocessing.Process):
"""
A basic file writer class
It spawns a new process that shares a queue with the reader process
"""
def __init__(self,queue,session,sleep_interval,fp,event):
self.queue = queue
self.session = session
self.sleep_interval = sleep_interval
self.offset = 0
self.queue_offset = 0
self.fp = fp
self.dbqueue = Queue.Queue(50)
self.event = event
self.event.clear()
super(FWriter,self).__init__()
def myhandler(self,signum,frame):
#self.session.commit()
self.session.close()
self.fp.truncate()
self.fp.write(str(self.offset))
self.fp.close()
print "Stopping Writer"
sys.exit(0)
def process_line(self,line):
#Do not process comments
if line[0] == '#':
return None
my_list = []
split_line = line.split(',')
my_list = split_line
return my_list
def run(self):
signal.signal(signal.SIGINT,self.myhandler)
signal.signal(signal.SIGCLD,signal.SIG_DFL)
signal.signal(signal.SIGILL,self.myhandler)
while True:
sleep_now = False
if not self.queue.empty():
print "WRITER:Getting"
line,offset = self.queue.get(False)
#Process the line just read
proc_line = self.process_line(line)
if proc_line:
#Must write it to DB. Put it into DB Queue
if self.dbqueue.full():
#DB Queue is full, put data into DB before putting more data
self.empty_dbqueue()
self.dbqueue.put(proc_line)
#Keep a track of the maximum offset in the queue
self.queue_offset = offset if offset > self.queue_offset else self.queue_offset
else:
#Looks like writing queue is empty. Just check if DB Queue is empty too
print "WRITER: Empty Read Queue"
self.empty_dbqueue()
sleep_now = True
if sleep_now:
self.event.clear()
print "WRITER: Sleeping for %d seconds"%self.sleep_interval
#time.sleep(self.sleep_interval)
self.event.wait(5)
def empty_dbqueue(self):
#The DB Queue has many objects waiting to be written to the DB. Lets write them
print "WRITER:Emptying DB QUEUE"
while True:
try:
new_line = self.dbqueue.get(False)
except Queue.Empty:
#Write the new offset to file
self.offset = self.queue_offset
break
print new_line[0]
def main():
write_file = '/home/xyz/stats.offset'
wp = open(write_file,'r')
read_offset = wp.read()
try:
read_offset = int(read_offset)
except ValueError:
read_offset = 0
wp.close()
print read_offset
read_file = '/var/log/somefile'
file_q = multiprocessing.Queue(100)
ev = multiprocessing.Event()
new_reader = FReader(file_q,open(read_file,'r'),30,read_offset,ev)
new_writer = FWriter(file_q,open('/dev/null'),30,open(write_file,'w'),ev)
new_reader.start()
new_writer.start()
try:
new_reader.join()
new_writer.join()
except KeyboardInterrupt:
print "Closing Master"
new_reader.join()
new_writer.join()
if __name__=='__main__':
main()
Writerのdbqueueは、データベースの書き込みをまとめてバッチ処理するためのものであり、各行について、その行のオフセットを保持します。DBに書き込まれた最大オフセットは、終了時にオフセットファイルに保存されるため、次の実行で残した場所を取得できます。DBオブジェクト(セッション)は'/dev/null'
デモ用です。
以前ではなく
self.event.wait(5)
やっていた
time.sleep(self.sleep_interval)
これは(私が言ったように)うまく機能しましたが、少し遅れが生じました。しかし、その後、プロセスは完全に終了しました。
メインプロセスでを実行Ctrl-C
すると、リーダーは終了しますが、ライターはOSErrorをスローします
^CStopping Reader
Closing Master
Stopping Writer
Process FWriter-2:
Traceback (most recent call last):
File "/usr/lib64/python2.6/multiprocessing/process.py", line 232, in _bootstrap
self.run()
File "FileParse.py", line 113, in run
self.event.wait(5)
File "/usr/lib64/python2.6/multiprocessing/synchronize.py", line 303, in wait
self._cond.wait(timeout)
File "/usr/lib64/python2.6/multiprocessing/synchronize.py", line 212, in wait
self._wait_semaphore.acquire(True, timeout)
OSError: [Errno 0] Error
event.wait()がどういうわけかコードをブロックすることは知っていますが、これを克服する方法を見つけることができません。self.event.wait(5)
私はラッピングとブロックを試しsys.exit()
ましたtry: except OSError:
が、それはプログラムを永久にハングさせるだけです。
Python-2.6を使用しています