2

私は、(うまくいけば)任意のファイルを解析し、解析されたコンテンツに対して任意のアクションを実行できるファイルプロセッサを作成しています。ファイルプロセッサは継続的に実行する必要があります。私がフォローしている基本的な考え方は

  1. 各ファイルには、2つの関連するプロセスがあります(1つは読み取り用、もう1つは解析と別の場所への書き込み用)
  2. Queueリーダーは、EOFまたはバッファーがいっぱいになるまで、共通のバッファー(たとえばa)に行を読み取ります。それから待つ(寝る)
  3. ライターはバッファーから読み取り、解析し、バッファーが空でなくなるまで(たとえば)DBに書き込みます。それから待つ(寝る)
  4. メインプログラムを中断すると、リーダー/ライターは安全に終了します(バッファーは書き込みなしで洗い流される可能性があります)

プログラムは正常に実行されます。ただし、Writerが最初に初期化して、バッファが空であることを検出する場合があります。だからそれは眠りにつくでしょう。リーダーはバッファーをいっぱいにしてスリープします。したがって、sleep_interval私のコードでは何もしません。これを回避するために、aを使用multiprocessing.Event()して、バッファに処理可能なエントリがいくつかあることをライターに通知してみました。

私のコードは

import multiprocessing
import time
import sys
import signal
import Queue

class FReader(multiprocessing.Process): 
    """
    A basic file reader class
    It spawns a new process that shares a queue with the writer process
    """
    def __init__(self,queue,fp,sleep_interval,read_offset,event): 
        self.queue = queue
        self.fp = fp
        self.sleep_interval = sleep_interval
        self.offset = read_offset
        self.fp.seek(self.offset)
        self.event = event
        self.event.clear()
        super(FReader,self).__init__()

    def myhandler(self,signum,frame): 
        self.fp.close()
        print "Stopping Reader"
        sys.exit(0)

    def run(self): 
        signal.signal(signal.SIGINT,self.myhandler)
        signal.signal(signal.SIGCLD,signal.SIG_DFL)
        signal.signal(signal.SIGILL,self.myhandler)
        while True: 
            sleep_now = False
            if not self.queue.full(): 
                print "READER:Reading"
                m = self.fp.readline()
                if not self.event.is_set(): 
                    self.event.set()
                if m: 
                    self.queue.put((m,self.fp.tell()),block=False)
                else: 
                    sleep_now = True 
            else: 
                print "Queue Full"
                sleep_now = True

            if sleep_now: 
                print "Reader sleeping for %d seconds"%self.sleep_interval
                time.sleep(self.sleep_interval)            

class FWriter(multiprocessing.Process): 
    """
    A basic file writer class
    It spawns a new process that shares a queue with the reader process
    """
    def __init__(self,queue,session,sleep_interval,fp,event): 
        self.queue = queue
        self.session = session
        self.sleep_interval = sleep_interval
        self.offset = 0
        self.queue_offset = 0
        self.fp = fp
        self.dbqueue = Queue.Queue(50)
        self.event = event
        self.event.clear()
        super(FWriter,self).__init__()

    def myhandler(self,signum,frame): 
        #self.session.commit()
        self.session.close()
        self.fp.truncate()
        self.fp.write(str(self.offset))
        self.fp.close()
        print "Stopping Writer"
        sys.exit(0)

    def process_line(self,line): 
        #Do not process comments
        if line[0] == '#': 
            return None
        my_list = []
        split_line = line.split(',')
        my_list = split_line
        return my_list

    def run(self): 
        signal.signal(signal.SIGINT,self.myhandler)
        signal.signal(signal.SIGCLD,signal.SIG_DFL)
        signal.signal(signal.SIGILL,self.myhandler)
        while True: 
            sleep_now = False
            if not self.queue.empty(): 
                print "WRITER:Getting"
                line,offset = self.queue.get(False)
                #Process the line just read
                proc_line = self.process_line(line)
                if proc_line: 
                    #Must write it to DB. Put it into DB Queue
                    if self.dbqueue.full(): 
                        #DB Queue is full, put data into DB before putting more data
                        self.empty_dbqueue()
                    self.dbqueue.put(proc_line)
                    #Keep a track of the maximum offset in the queue
                    self.queue_offset = offset if offset > self.queue_offset else self.queue_offset
            else: 
                #Looks like writing queue is empty. Just check if DB Queue is empty too
                print "WRITER: Empty Read Queue"
                self.empty_dbqueue()
                sleep_now = True
            if sleep_now: 
                self.event.clear()
                print "WRITER: Sleeping for %d seconds"%self.sleep_interval
                #time.sleep(self.sleep_interval)
                self.event.wait(5) 



    def empty_dbqueue(self): 
        #The DB Queue has many objects waiting to be written to the DB. Lets write them 
        print "WRITER:Emptying DB QUEUE"
        while True: 
            try: 
                new_line = self.dbqueue.get(False)
            except Queue.Empty: 
                #Write the new offset to file
                self.offset = self.queue_offset
                break
            print new_line[0]

def main(): 
    write_file = '/home/xyz/stats.offset'
    wp = open(write_file,'r')
    read_offset = wp.read()
    try: 
        read_offset = int(read_offset)
    except ValueError: 
        read_offset = 0
    wp.close()
    print read_offset
    read_file = '/var/log/somefile'
    file_q = multiprocessing.Queue(100)
    ev = multiprocessing.Event()
    new_reader = FReader(file_q,open(read_file,'r'),30,read_offset,ev)
    new_writer = FWriter(file_q,open('/dev/null'),30,open(write_file,'w'),ev)
    new_reader.start()
    new_writer.start()
    try: 
        new_reader.join()
        new_writer.join()
    except KeyboardInterrupt: 
        print "Closing Master"
        new_reader.join()
        new_writer.join()

if __name__=='__main__': 
    main()

Writerのdbqueueは、データベースの書き込みをまとめてバッチ処理するためのものであり、各行について、その行のオフセットを保持します。DBに書き込まれた最大オフセットは、終了時にオフセットファイルに保存されるため、次の実行で残した場所を取得できます。DBオブジェクト(セッション)は'/dev/null'デモ用です。

以前ではなく

self.event.wait(5)

やっていた

time.sleep(self.sleep_interval)

これは(私が言ったように)うまく機能しましたが、少し遅れが生じました。しかし、その後、プロセスは完全に終了しました。

メインプロセスでを実行Ctrl-Cすると、リーダーは終了しますが、ライターはOSErrorをスローします

^CStopping Reader
Closing Master
Stopping Writer
Process FWriter-2:
Traceback (most recent call last):
  File "/usr/lib64/python2.6/multiprocessing/process.py", line 232, in _bootstrap
    self.run()
  File "FileParse.py", line 113, in run
    self.event.wait(5)
  File "/usr/lib64/python2.6/multiprocessing/synchronize.py", line 303, in wait
    self._cond.wait(timeout)
  File "/usr/lib64/python2.6/multiprocessing/synchronize.py", line 212, in wait
    self._wait_semaphore.acquire(True, timeout)
OSError: [Errno 0] Error

event.wait()がどういうわけかコードをブロックすることは知っていますが、これを克服する方法を見つけることができません。self.event.wait(5)私はラッピングとブロックを試しsys.exit()ましたtry: except OSError:が、それはプログラムを永久にハングさせるだけです。

Python-2.6を使用しています

4

1 に答える 1

1

Writerクラスにキューブロッキングタイムアウトを使用する方が良いと思います-Queue.get(True、5)を使用すると、時間間隔中に何かがキューに入れられた場合、ライターはすぐにウェイクアップします。ライターその場合、ループは次のようになります。

while True: 
    sleep_now = False
    try:
        print "WRITER:Getting"
        line,offset = self.queue.get(True, 5)
        #Process the line just read
        proc_line = self.process_line(line)
        if proc_line: 
            #Must write it to DB. Put it into DB Queue
            if self.dbqueue.full(): 
                #DB Queue is full, put data into DB before putting more data
                self.empty_dbqueue()
            self.dbqueue.put(proc_line)
            #Keep a track of the maximum offset in the queue
            self.queue_offset = offset if offset > self.queue_offset else self.queue_offset
    except Queue.Empty: 
        #Looks like writing queue is empty. Just check if DB Queue is empty too
        print "WRITER: Empty Read Queue"
        self.empty_dbqueue()
于 2012-09-21T11:17:03.613 に答える