Reputation: 603
All,
I am trying to access and process a large chunk of data from an Oracle database. So I used multiprocessing module to spawn 50 processes to access the database. To avoid opening 50 physical connections, I tried to use session pooling from cx_Oracle. So the code looks like below. However I always got an unpickling error. I know cx_Oracle has pickling issue, but I thought I go around it by using a global variable. Could any one help.
import sys
import cx_Oracle
import os
from multiprocessing import Pool
# Read a list of ids from the input file
def ReadList(inputFile):
............
def GetText(applId):
global sPool
connection = sPool.acquire()
cur = connection.cursor()
cur.prepare('Some Query')
cur.execute(None, appl_id = applId)
result = cur.fetchone()
title = result[0]
abstract = result[2].read()
sa = result[3].read()
cur.close()
sPool.release(connection)
return (title, abstract, sa)
if __name__=='__main__':
inputFile = sys.argv[1]
ids = ReadList(inputFile)
dsn = cx_Oracle.makedsn('xxx', ...)
sPool=cx_Oracle.SessionPool(....., min=1, max=10, increment=1)
pool = Pool(10)
results = pool.map(GetText, ids)
Exception in thread Thread-2:
Traceback (most recent call last):
File "/usr/lib/python2.6/threading.py", line 525, in __bootstrap_inner
self.run()
File "/usr/lib/python2.6/threading.py", line 477, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib/python2.6/multiprocessing/pool.py", line 282, in _handle_results
task = get()
UnpicklingError: NEWOBJ class argument has NULL tp_new
Upvotes: 1
Views: 3152
Reputation: 36036
How are you expecting 50 processes to use the same, intra-process-managed DB connection (pool)?!
Upvotes: 1
Reputation: 1
First of all, your code results the error "NameError: global name 'sPool' is not defined", therefore sPool=cx_Oracle.SessionPool(....., min=1, max=10, increment=1)
must be above of def GetText(applId):
For me, this code starts work properly after change from multiprocessing import Pool
to from multiprocessing.dummy import Pool
and adding parameter threaded=True
to call of cx_Oracle.SessionPool as sPool=cx_Oracle.SessionPool(....., min=1, max=10, increment=1, threaded=True)
Upvotes: 0