Reputation: 467
Goal:
Current state:
Hence the idea to use multiprocessing; I want to be able to write the output concurrently and not be CPU bound but rather I/O bound.
Background aside, here's the issue (which is essentially a design question) - the multiprocessing library works by pickling objects and then piping the data to other spawned processes; but the ResultProxy objects and shared Queue that I'm trying to use in the WriteWorker Process aren't picklable, which results in the following message (not verbatim, but close enough):
pickle.PicklingError: Can't pickle object in WriteWorker.start()
So the question for you helpful folks is, any ideas on a potential design pattern or approach that would avoid this issue? This seems like a simple, classic producer-consumer problem, I imagine the solution is straightforward and I'm just overthinking it
any help or feedback is appreciated! thanks :)
edit: here's some relevant snippets of code, let me know if there's any other context I can provide
from the parent class:
#init manager and queues
self.manager = multiprocessing.Manager()
self.query_queue = self.manager.Queue()
self.write_queue = self.manager.Queue()
def _get_data(self):
#spawn a pool of query processes, and pass them query queue instance
for i in xrange(self.NUM_QUERY_THREADS):
qt = QueryWorker.QueryWorker(self.query_queue, self.write_queue, self.config_values, self.args)
qt.daemon = True
# qt.setDaemon(True)
qt.start()
#populate query queue
self.parse_sql_queries()
#spawn a pool of writer processes, and pass them output queue instance
for i in range(self.NUM_WRITE_THREADS):
wt = WriteWorker.WriteWorker(self.write_queue, self.output_path, self.WRITE_BUFFER, self.output_dict)
wt.daemon = True
# wt.setDaemon(True)
wt.start()
#wait on the queues until everything has been processed
self.query_queue.join()
self.write_queue.join()
and from the QueryWorker class:
def run(self):
while True:
#grabs host from query queue
query_tupe = self.query_queue.get()
table = query_tupe[0]
query = query_tupe[1]
query_num = query_tupe[2]
if query and table:
#grab connection from pool, run the query
connection = self.engine.connect()
print 'Running query #' + str(query_num) + ': ' + table
try:
result = connection.execute(query)
except:
print 'Error while running query #' + str(query_num) + ': \n\t' + str(query) + '\nError: ' + str(sys.exc_info()[1])
#place result handle tuple into out queue
self.out_queue.put((table, result))
#signals to queue job is done
self.query_queue.task_done()
Upvotes: 2
Views: 1890
Reputation: 11
The simple answer is to avoid using ResultsProxy directly. Instead get the data from the ResultsProxy using cursor.fetchall() or cursor.fetchmany(number_to_fetch) and then pass the data into the multiprocessing queue.
Upvotes: 1