Simon Nicholls
Simon Nicholls

Reputation: 655

Python cx_oracle multithreading not working for cursor per thread

I am trying to get completely separate Oracle queries running using cx_Oracle in python in parallel.

I can successfully make this work by setting up a new database connection per thread and then running the queries in each separate thread, this brings the total time from around 2 minutes to 1 minute 20 so it's definitely working. Timings for queries:

START_TIME                      END_TIME
17-FEB-16 22.33.28.000000000    17-FEB-16 22.33.30.000000000
17-FEB-16 22.33.30.000000000    17-FEB-16 22.33.33.000000000
17-FEB-16 22.33.33.000000000    17-FEB-16 22.33.36.000000000
17-FEB-16 22.33.36.000000000    17-FEB-16 22.33.36.000000000
17-FEB-16 22.33.36.000000000    17-FEB-16 22.34.08.000000000
17-FEB-16 22.34.08.000000000    17-FEB-16 22.34.26.000000000
17-FEB-16 22.34.26.000000000    17-FEB-16 22.34.27.000000000
17-FEB-16 22.34.27.000000000    17-FEB-16 22.34.29.000000000

There is an overhead however in setting up a connection to the database in each thread and I'm pretty sure I should just be able to make a new cursor for each thread and share the connection as seen here:

http://www.oracle.com/technetwork/articles/vasiliev-python-concurrency-087536.html

What happens when I share the connection and use a separate cursor however is that the queries all start at the same time and then end at the same time so it looks like while the threads are being spawned, on the database the query is still running sequentially. Timings for queries:

START_TIME                      END_TIME
17-FEB-16 22.36.32.000000000    17-FEB-16 22.38.21.000000000
17-FEB-16 22.36.32.000000000    17-FEB-16 22.38.21.000000000
17-FEB-16 22.36.32.000000000    17-FEB-16 22.38.21.000000000
17-FEB-16 22.36.31.000000000    17-FEB-16 22.38.21.000000000
17-FEB-16 22.36.31.000000000    17-FEB-16 22.38.21.000000000
17-FEB-16 22.36.31.000000000    17-FEB-16 22.38.21.000000000
17-FEB-16 22.36.31.000000000    17-FEB-16 22.38.21.000000000

Code for multi connections:

for file in file_transporter.complete_file_list:
        #Get database and open connection
        the_db =      shared_lib_wrapper.get_oracle().Oracle(the_logger)
        the_db .connect(conn_str())
        #Create new thread
        thread = threading.Thread(target=Loader, args=(params, the_date, the_logger, the_db, file, file_transporter.complete_file_list[file]))
        the_logger.info("Running Thread: " + thread.getName())
        thread.start()

Code for multi cursors (within runLoad there is a function that creates a new cursor and executes a procedure - see below):

for file in self.file_list:
        file_parametes = self.file_list[file]
        function_to_run = file_parametes['LOAD_PACKAGE'] + '.' + file_parametes['LOAD_FUNCTION']

        #Create new thread
        thread = threading.Thread(target=self.runLoad, args=(file_parametes['RUN_ID'], function_to_run))
        self.log.info("Spawned Thread: " + thread.getName())
        self.log.info("Running Thread: " + thread.getName())
        thread.start()

Code that creates cursor:

def execute_stored_proc_with_in_and_out_params(self, proc_name, params, dbms_logging=False):
    try:
        cursor = cx_Oracle.Cursor(self.db_conn

My questions therefore are:

1) Am I doing something wrong with creating the cursors? - If so any ideas of how to fix it, I've read that cx_oracle is threadsafety 2:

Currently 2, which means that threads may share the module and connections, but not cursors.

2) If I can't share the connection is there anything wrong with creating a new one for each thread, it still seems to give me a performance boost even with the overhead of creating each connection?

Upvotes: 4

Views: 9688

Answers (2)

Anthony Tuininga
Anthony Tuininga

Reputation: 7096

Please see the below which is a working implementation of a program using the same connection but a separate cursor in each thread. The procedure I am calling is in the cx_Oracle test case (part of the 5.2.1 release) and is very simple so I call it multiple times (a random number in each thread) in the example. The output clearly shows that the threads do not finish at the same time.

from __future__ import print_function

import cx_Oracle
import datetime
import random
import threading

connection = cx_Oracle.Connection("cx_Oracle/dev", threaded = True)

def TestThread(threadNum):
     startTime = datetime.datetime.today()
     cursor = connection.cursor()
     numInputs = int(random.random() * 5000)
     print("Thread", threadNum, "with", numInputs, "inputs:", startTime)
     for i in range(numInputs):
         value = bool(int(random.random() * 2))
         cursor.callfunc("pkg_TestBooleans.GetStringRep", str, (value,))
     endTime = datetime.datetime.today()
     print("Thread", threadNum, "with", numInputs, "inputs:", endTime)

 threads = []
 for i in range(8):
     thread = threading.Thread(target = TestThread, args = (i + 1,))
     threads.append(thread)
     thread.start()
 print("All threads spawned...waiting for them to complete...")
 for thread in threads:
     thread.join()

Output is as follows:

Thread 1 with 3405 inputs: 2016-02-22 07:55:07.849127
Thread 2 with 2706 inputs: 2016-02-22 07:55:07.849998
Thread 3 with 4101 inputs: 2016-02-22 07:55:07.850256
Thread 4 with 2912 inputs: 2016-02-22 07:55:07.850937
Thread 5 with 3747 inputs: 2016-02-22 07:55:07.851275
Thread 6 with 4318 inputs: 2016-02-22 07:55:07.851534
Thread 7 with 1453 inputs: 2016-02-22 07:55:07.852649
Thread 8 with 3304 inputs: 2016-02-22 07:55:07.853090
All threads spawned...waiting for them to complete...
Thread 7 with 1453 inputs: 2016-02-22 07:55:09.897217
Thread 2 with 2706 inputs: 2016-02-22 07:55:11.446744
Thread 4 with 2912 inputs: 2016-02-22 07:55:11.681414
Thread 8 with 3304 inputs: 2016-02-22 07:55:12.016809
Thread 1 with 3405 inputs: 2016-02-22 07:55:12.081846
Thread 5 with 3747 inputs: 2016-02-22 07:55:12.266111
Thread 3 with 4101 inputs: 2016-02-22 07:55:12.375623
Thread 6 with 4318 inputs: 2016-02-22 07:55:12.409352

UPDATE: as noted in the comments, a connection only performs one activity at a time. So even though using multiple cursors from the same connection in multiple threads is thread safe, it doesn't actually improve concurrency. To do that you must use a separate connection.

Upvotes: 6

Vlad Bezden
Vlad Bezden

Reputation: 89685

from concurrent.futures import ThreadPoolExecutor, as_completed
import cx_Oracle
import datetime

CONN_INFO = {
    'host': 'xxx.xx.xx.x',
    'port': 99999,
    'user': 'user_name',
    'psw': 'password',
    'service': 'abc.xyz.com',
}

CONN_STR = '{user}/{psw}@{host}:{port}/{service}'.format(**CONN_INFO)

# your long running query
QUERY = 'SELECT FROM * customer where date = :date'

def run(date):
    conn = cx_Oracle.connect(CONN_STR, threaded=True)
    cursor = conn.cursor()
    cursor.execute(QUERY, {'date': date})    
    data = cursor.fetchall()
    cursor.close()

    return data

def main():
    dates = [datetime.datetime.today() - datetime.timedelta(days=x) for x in range(0, 30)] 
    with ThreadPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(run, d) for d in dates]

        for future in as_completed(futures):
            # process your records from each thread
            # process_records(future.result())


if __name__ == '__main__':
    main()

Upvotes: 0

Related Questions