Reputation: 10850
I have a pretty standard multiprocessing script that processes 2 million records in a database table. Before I even finish putting work in the worker_queue, the memory usage balloons over 12GB and crashes. Is there a better way to design this?
import math
import psycopg2
from psycopg2.extras import DictCursor
from multiprocessing import Process, Manager
from config import DB
connection = psycopg2.connect(DB)
cursor = connection.cursor(cursor_factory=DictCursor)
def worker(worker_queue, progress):
for row in iter(worker_queue.get, None):
# Do work
progress.put(1)
if __name__ == "__main__":
total, done = 0, 0
cursor.execute("SELECT * from table")
manager = Manager()
worker_queue = manager.Queue()
progress = manager.Queue()
for row in cursor:
worker_queue.put(row)
total += 1
workers = [Process(target=worker, args=(worker_queue, progress)) for i in range(50)]
for each in workers:
each.start()
for i in iter(progress.get, None):
done += 1
remaining = total - done
if remaining == 0:
print 'Done'
elif ((remaining % (10 ** int(math.log10(remaining)))) == 0):
print str(remaining) + ' remaining'
Upvotes: 1
Views: 1177
Reputation: 56477
Two things worth noting
1) Don't use select *
. There are two reasons for that: first, you load more data then you probably need. Second is that you have no control over the order of data (that will be important once we go to point 2))
2) Don't use DictCursor
. It turns each row into a dict which eats lots of memory (since you effectively duplicate column names in each dict). Use default cursor_factory instead. Now in order to know the order of fields returned in tuples you have to specify that order in your select
query.
That should take care of your problem. If it does not, then you have to do the job over smaller set of data.
Upvotes: 2