Reputation: 6730
I'm trying to set something up where one thread is writing a list of work and another thread is reading the list and working from it. This list can be very large so to stop this list being held in memory I want to have it written in a file (or anyway of preserving memory- generators?).
I put together a little runnable example with a sleep in the writer so that the reader can catch up. I'm wondering how I can get the reader to not stop when it "overtakes" the writer. I looked at using .seek
and .tell
but I got weird behaviour and I'm not sure that's the right route.
As another question, is this at all a sensible idea? Maybe there's a much more elegant way I can queue up a list of strings without using loads of memory.
import threading,time
class Writer(threading.Thread):
lock= threading.Lock()
def __init__(self,file_path,size):
threading.Thread.__init__(self)
self.file_path= file_path
self.size= size
self.i=0
def how_many(self):
with self.lock:
print "Reader starting, writer is on",self.i
def run(self):
f=open(self.file_path,"w")
for i in xrange(self.size):
with self.lock:
self.i=i
if i%1000==0:
time.sleep(0.1)
f.write("%s\n"%i)
f.close()
class Reader(threading.Thread):
def __init__(self,file_path):
threading.Thread.__init__(self)
self.file_path= file_path
def run(self):
f=open(self.file_path,"r")
line=0
for line in f:
pass
print "Reader got to: %s"%line.strip()
if __name__ == "__main__":
a= Writer("testfile",2000000)
b= Reader("testfile")
a.start()
time.sleep(1)
a.how_many()
b.start()
Upvotes: 2
Views: 2876
Reputation: 6730
I did solve this, using a buffered-file-queue where the queue is spread out between memory and file. Items are put into a Queue but if the items in the queue exceed the specified Queue size, any overflow will be stored on file to preserve memory and will be get
out the queue just the same
If anyone is looking to do something similar I put it on github here
Upvotes: 2
Reputation: 1150
The multiprocessing JoinableQueue
class is designed to allow limiting the backlog that can build up while waiting for child threads / processes to consume tasks. I'm going to assume you're reading work in from a file, and that the file is too large to easily hold in memory all at once.
The following is my attempt at a solution that should limit memory usage. In this example I'm processing a newline terminated series of dates, converting them into a standard format, and writing them back out to a new file.
I'm by no means an expert with the multiprocessing module, so should anyone see a mistake / better way to do it, I would like to hear it.
from multiprocessing import Process, Queue, JoinableQueue
import time
date_formats = [
"%Y%m",
"%Y-%m-%d",
"%y-%m-%d",
"%y%m%d",
"%Y%m%d",
"%m/%d/%Y",
"%m/%d/%y",
"%m/%d/%Y %H:%M",
"%m%d%y",
"%m%d%Y",
"%B, %d %Y",
"%B, %d %y",
"%d %B, %Y",
"%d %B, %y",
"%B %d %Y",
"%B %d %y",
"%B %d, %Y",
"%B %d, %y",
"%B %d %Y",
"%B %d %y",
"%b %d %Y",
"%b %d, %Y",
"%b %d %y",
"%b %d, %y",
"%d-%b-%y",
"%Y-%m-%d %H:%M:%S"
]
def convert_date(date):
date = date.strip()
for dateformat in date_formats:
try:
converted = time.strptime(date, dateformat)
converted = time.strftime("%Y-%m-%d", converted)
return converted
except ValueError:
continue
def writer(result_queue):
f = open("iso_dates.out", "wb")
while True:
try:
date = result_queue.get(timeout=1)
f.write(date + '\n')
except:
break
f.close()
def worker(work_queue, result_queue):
while True:
date = work_queue.get()
if not date:
break
result_queue.put(convert_date(date))
work_queue.task_done()
dates = open("dates.out", "rb")
work_queue = JoinableQueue(512) #allow no more than 512 items on queue
result_queue = Queue()
writer_proc = Process(target=writer, args=(result_queue,))
worker_procs = 2
for i in range(worker_procs):
p = Process(target=worker, args=(work_queue, result_queue))
p.daemon = True
p.start()
writer_proc.start()
for date in dates:
work_queue.put(date) #will block until tasks are consumed if maxsize is encountered
work_queue.join()
dates.close()
Upvotes: 0
Reputation: 19347
For sending messages between threads the Queue
class is quite handy. Import it with from Queue import Queue
, construct one, and pass the queue object to each thread. It supports multiple producers and consumers, and you can put most any Python object into the queue - lists, objects, iterators, etc.
To transfer lots of data using this queue, just write one object at a time to the queue and use a generator function in the consumer that yields data out of the queue. Queues support a depth limit, in case the producer is faster than the consumer.
Upvotes: 0