Bin Zhou
Bin Zhou

Reputation: 603

Use cx_Oracle and multiprocessing to query data concurrently

All,

I am trying to access and process a large chunk of data from an Oracle database. So I used multiprocessing module to spawn 50 processes to access the database. To avoid opening 50 physical connections, I tried to use session pooling from cx_Oracle. So the code looks like below. However I always got an unpickling error. I know cx_Oracle has pickling issue, but I thought I go around it by using a global variable. Could any one help.

import sys
import cx_Oracle
import os
from multiprocessing import Pool

 # Read a list of ids from the input file
 def ReadList(inputFile):
        ............


def GetText(applId):
        global sPool
        connection = sPool.acquire()
        cur = connection.cursor()
        cur.prepare('Some Query')
        cur.execute(None, appl_id = applId)
        result = cur.fetchone()
        title = result[0]
        abstract = result[2].read()
        sa = result[3].read()
        cur.close()
        sPool.release(connection)
        return (title, abstract, sa)
if __name__=='__main__':
        inputFile = sys.argv[1]
        ids = ReadList(inputFile)
        dsn = cx_Oracle.makedsn('xxx', ...)
        sPool=cx_Oracle.SessionPool(....., min=1, max=10, increment=1)
        pool = Pool(10)
        results = pool.map(GetText, ids)


Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.6/threading.py", line 525, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.6/threading.py", line 477, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.6/multiprocessing/pool.py", line 282, in _handle_results
task = get()
UnpicklingError: NEWOBJ class argument has NULL tp_new

Upvotes: 1

Views: 3152

Answers (2)

ivan_pozdeev
ivan_pozdeev

Reputation: 36036

How are you expecting 50 processes to use the same, intra-process-managed DB connection (pool)?!

Upvotes: 1

theoric
theoric

Reputation: 1

First of all, your code results the error "NameError: global name 'sPool' is not defined", therefore sPool=cx_Oracle.SessionPool(....., min=1, max=10, increment=1) must be above of def GetText(applId):

For me, this code starts work properly after change from multiprocessing import Pool to from multiprocessing.dummy import Pool and adding parameter threaded=True to call of cx_Oracle.SessionPool as sPool=cx_Oracle.SessionPool(....., min=1, max=10, increment=1, threaded=True)

Upvotes: 0

Related Questions