GP89
GP89

Reputation: 6730

Queueing work, without using a lot of memory

I'm trying to set something up where one thread is writing a list of work and another thread is reading the list and working from it. This list can be very large so to stop this list being held in memory I want to have it written in a file (or anyway of preserving memory- generators?).

I put together a little runnable example with a sleep in the writer so that the reader can catch up. I'm wondering how I can get the reader to not stop when it "overtakes" the writer. I looked at using .seek and .tell but I got weird behaviour and I'm not sure that's the right route.

As another question, is this at all a sensible idea? Maybe there's a much more elegant way I can queue up a list of strings without using loads of memory.

import threading,time

class Writer(threading.Thread):

  lock= threading.Lock()

  def __init__(self,file_path,size):
    threading.Thread.__init__(self)
    self.file_path= file_path
    self.size= size
    self.i=0

  def how_many(self):
    with self.lock:
      print "Reader starting, writer is on",self.i

  def run(self):
    f=open(self.file_path,"w")
    for i in xrange(self.size):
      with self.lock:
        self.i=i
      if i%1000==0:
        time.sleep(0.1)
      f.write("%s\n"%i)
    f.close()

class Reader(threading.Thread):

  def __init__(self,file_path):
    threading.Thread.__init__(self)
    self.file_path= file_path

  def run(self):
    f=open(self.file_path,"r")
    line=0
    for line in f:
      pass
    print "Reader got to: %s"%line.strip()


if __name__ == "__main__":
  a= Writer("testfile",2000000)
  b= Reader("testfile")
  a.start()
  time.sleep(1)
  a.how_many()
  b.start()

Upvotes: 2

Views: 2876

Answers (3)

GP89
GP89

Reputation: 6730

I did solve this, using a buffered-file-queue where the queue is spread out between memory and file. Items are put into a Queue but if the items in the queue exceed the specified Queue size, any overflow will be stored on file to preserve memory and will be get out the queue just the same

If anyone is looking to do something similar I put it on github here

Upvotes: 2

zchtodd
zchtodd

Reputation: 1150

The multiprocessing JoinableQueue class is designed to allow limiting the backlog that can build up while waiting for child threads / processes to consume tasks. I'm going to assume you're reading work in from a file, and that the file is too large to easily hold in memory all at once.

The following is my attempt at a solution that should limit memory usage. In this example I'm processing a newline terminated series of dates, converting them into a standard format, and writing them back out to a new file.

I'm by no means an expert with the multiprocessing module, so should anyone see a mistake / better way to do it, I would like to hear it.

from multiprocessing import Process, Queue, JoinableQueue
import time

date_formats =  [
    "%Y%m",
    "%Y-%m-%d", 
    "%y-%m-%d", 
    "%y%m%d", 
    "%Y%m%d", 
    "%m/%d/%Y", 
    "%m/%d/%y", 
    "%m/%d/%Y %H:%M",
    "%m%d%y", 
    "%m%d%Y", 
    "%B, %d %Y", 
    "%B, %d %y", 
    "%d %B, %Y", 
    "%d %B, %y",
    "%B %d %Y", 
    "%B %d %y", 
    "%B %d, %Y", 
    "%B %d, %y", 
    "%B %d %Y", 
    "%B %d %y",
    "%b %d %Y", 
    "%b %d, %Y", 
    "%b %d %y", 
    "%b %d, %y", 
    "%d-%b-%y", 
    "%Y-%m-%d %H:%M:%S"
]

def convert_date(date):
    date = date.strip()
    for dateformat in date_formats:
        try:
            converted = time.strptime(date, dateformat)
            converted = time.strftime("%Y-%m-%d", converted)
            return converted
        except ValueError:
            continue

def writer(result_queue):
    f = open("iso_dates.out", "wb")
    while True:
        try:
            date = result_queue.get(timeout=1)
            f.write(date + '\n')
        except:
            break       
    f.close()

def worker(work_queue, result_queue):
    while True:
        date = work_queue.get()

        if not date:
            break

        result_queue.put(convert_date(date))
        work_queue.task_done()

dates        = open("dates.out", "rb")
work_queue   = JoinableQueue(512) #allow no more than 512 items on queue
result_queue = Queue()
writer_proc  = Process(target=writer, args=(result_queue,))
worker_procs = 2

for i in range(worker_procs):
    p = Process(target=worker, args=(work_queue, result_queue))
    p.daemon = True
    p.start()

writer_proc.start()
for date in dates:
    work_queue.put(date) #will block until tasks are consumed if maxsize is encountered

work_queue.join()
dates.close()

Upvotes: 0

wberry
wberry

Reputation: 19347

For sending messages between threads the Queue class is quite handy. Import it with from Queue import Queue, construct one, and pass the queue object to each thread. It supports multiple producers and consumers, and you can put most any Python object into the queue - lists, objects, iterators, etc.

To transfer lots of data using this queue, just write one object at a time to the queue and use a generator function in the consumer that yields data out of the queue. Queues support a depth limit, in case the producer is faster than the consumer.

Upvotes: 0

Related Questions