RedBaron
RedBaron

Reputation: 4755

File Processor using multiprocessing

I am writing a file processor that can (hopefully) parse arbitrary files and perform arbitrary actions on the parsed contents. The file processor needs to run continuously. The basic idea that I am following is

  1. Each file will have two associated processes (One for reading, other for Parsing and writing somewhere else)
  2. The reader will read a line into a common buffer(say a Queue) till EOF or buffer full. Then wait(sleep)
  3. Writer will read from buffer, parse the stuff, write it to (say) DB till buffer not empty. Then wait(sleep)
  4. Interrupting the main program will cause the reader/writer to exit safely (buffer can be washed away without writing)

The program runs fine. But, sometimes Writer will initialize first and find the buffer empty. So it will go to sleep. The Reader will fill the buffer and sleep too. So for sleep_interval my code does nothing. To get around that thing, I tried using a multiprocessing.Event() to signal to the writer that the buffer has some entries which it may process.

My code is

import multiprocessing
import time
import sys
import signal
import Queue

class FReader(multiprocessing.Process): 
    """
    A basic file reader class
    It spawns a new process that shares a queue with the writer process
    """
    def __init__(self,queue,fp,sleep_interval,read_offset,event): 
        self.queue = queue
        self.fp = fp
        self.sleep_interval = sleep_interval
        self.offset = read_offset
        self.fp.seek(self.offset)
        self.event = event
        self.event.clear()
        super(FReader,self).__init__()

    def myhandler(self,signum,frame): 
        self.fp.close()
        print "Stopping Reader"
        sys.exit(0)

    def run(self): 
        signal.signal(signal.SIGINT,self.myhandler)
        signal.signal(signal.SIGCLD,signal.SIG_DFL)
        signal.signal(signal.SIGILL,self.myhandler)
        while True: 
            sleep_now = False
            if not self.queue.full(): 
                print "READER:Reading"
                m = self.fp.readline()
                if not self.event.is_set(): 
                    self.event.set()
                if m: 
                    self.queue.put((m,self.fp.tell()),block=False)
                else: 
                    sleep_now = True 
            else: 
                print "Queue Full"
                sleep_now = True

            if sleep_now: 
                print "Reader sleeping for %d seconds"%self.sleep_interval
                time.sleep(self.sleep_interval)            

class FWriter(multiprocessing.Process): 
    """
    A basic file writer class
    It spawns a new process that shares a queue with the reader process
    """
    def __init__(self,queue,session,sleep_interval,fp,event): 
        self.queue = queue
        self.session = session
        self.sleep_interval = sleep_interval
        self.offset = 0
        self.queue_offset = 0
        self.fp = fp
        self.dbqueue = Queue.Queue(50)
        self.event = event
        self.event.clear()
        super(FWriter,self).__init__()

    def myhandler(self,signum,frame): 
        #self.session.commit()
        self.session.close()
        self.fp.truncate()
        self.fp.write(str(self.offset))
        self.fp.close()
        print "Stopping Writer"
        sys.exit(0)

    def process_line(self,line): 
        #Do not process comments
        if line[0] == '#': 
            return None
        my_list = []
        split_line = line.split(',')
        my_list = split_line
        return my_list

    def run(self): 
        signal.signal(signal.SIGINT,self.myhandler)
        signal.signal(signal.SIGCLD,signal.SIG_DFL)
        signal.signal(signal.SIGILL,self.myhandler)
        while True: 
            sleep_now = False
            if not self.queue.empty(): 
                print "WRITER:Getting"
                line,offset = self.queue.get(False)
                #Process the line just read
                proc_line = self.process_line(line)
                if proc_line: 
                    #Must write it to DB. Put it into DB Queue
                    if self.dbqueue.full(): 
                        #DB Queue is full, put data into DB before putting more data
                        self.empty_dbqueue()
                    self.dbqueue.put(proc_line)
                    #Keep a track of the maximum offset in the queue
                    self.queue_offset = offset if offset > self.queue_offset else self.queue_offset
            else: 
                #Looks like writing queue is empty. Just check if DB Queue is empty too
                print "WRITER: Empty Read Queue"
                self.empty_dbqueue()
                sleep_now = True
            if sleep_now: 
                self.event.clear()
                print "WRITER: Sleeping for %d seconds"%self.sleep_interval
                #time.sleep(self.sleep_interval)
                self.event.wait(5) 



    def empty_dbqueue(self): 
        #The DB Queue has many objects waiting to be written to the DB. Lets write them 
        print "WRITER:Emptying DB QUEUE"
        while True: 
            try: 
                new_line = self.dbqueue.get(False)
            except Queue.Empty: 
                #Write the new offset to file
                self.offset = self.queue_offset
                break
            print new_line[0]

def main(): 
    write_file = '/home/xyz/stats.offset'
    wp = open(write_file,'r')
    read_offset = wp.read()
    try: 
        read_offset = int(read_offset)
    except ValueError: 
        read_offset = 0
    wp.close()
    print read_offset
    read_file = '/var/log/somefile'
    file_q = multiprocessing.Queue(100)
    ev = multiprocessing.Event()
    new_reader = FReader(file_q,open(read_file,'r'),30,read_offset,ev)
    new_writer = FWriter(file_q,open('/dev/null'),30,open(write_file,'w'),ev)
    new_reader.start()
    new_writer.start()
    try: 
        new_reader.join()
        new_writer.join()
    except KeyboardInterrupt: 
        print "Closing Master"
        new_reader.join()
        new_writer.join()

if __name__=='__main__': 
    main()

The dbqueue in Writer is for batching together Database writes and for each line I keep the offset of that line. The maximum offset written into DB is stored into offset file on exit so that I can pick up where I left on next run. The DB object (session) is just '/dev/null' for demo.

Previously rather than do

self.event.wait(5)

I was doing

time.sleep(self.sleep_interval)

Which (as I have said) worked well but introduced a little delay. But then the processes exited perfectly.

Now on doing a Ctrl-C on the main process, the reader exits but the writer throws an OSError

^CStopping Reader
Closing Master
Stopping Writer
Process FWriter-2:
Traceback (most recent call last):
  File "/usr/lib64/python2.6/multiprocessing/process.py", line 232, in _bootstrap
    self.run()
  File "FileParse.py", line 113, in run
    self.event.wait(5)
  File "/usr/lib64/python2.6/multiprocessing/synchronize.py", line 303, in wait
    self._cond.wait(timeout)
  File "/usr/lib64/python2.6/multiprocessing/synchronize.py", line 212, in wait
    self._wait_semaphore.acquire(True, timeout)
OSError: [Errno 0] Error

I know event.wait() somehow blocks the code but I can't get how to overcome this. I tried wrapping self.event.wait(5) and sys.exit() in a try: except OSError: block but that only makes the program hang forever.

I am using Python-2.6

Upvotes: 2

Views: 657

Answers (1)

Joe M.
Joe M.

Reputation: 609

I think it would be better to use the Queue blocking timeout for the Writer class - using Queue.get(True, 5), then if during the time interval something was put into the queue, the Writer would wake up immediately.. The Writer loop would then be something like:

while True: 
    sleep_now = False
    try:
        print "WRITER:Getting"
        line,offset = self.queue.get(True, 5)
        #Process the line just read
        proc_line = self.process_line(line)
        if proc_line: 
            #Must write it to DB. Put it into DB Queue
            if self.dbqueue.full(): 
                #DB Queue is full, put data into DB before putting more data
                self.empty_dbqueue()
            self.dbqueue.put(proc_line)
            #Keep a track of the maximum offset in the queue
            self.queue_offset = offset if offset > self.queue_offset else self.queue_offset
    except Queue.Empty: 
        #Looks like writing queue is empty. Just check if DB Queue is empty too
        print "WRITER: Empty Read Queue"
        self.empty_dbqueue()

Upvotes: 1

Related Questions