Reputation: 655
I am trying to get completely separate Oracle queries running using cx_Oracle in python in parallel.
I can successfully make this work by setting up a new database connection per thread and then running the queries in each separate thread, this brings the total time from around 2 minutes to 1 minute 20 so it's definitely working. Timings for queries:
START_TIME END_TIME
17-FEB-16 22.33.28.000000000 17-FEB-16 22.33.30.000000000
17-FEB-16 22.33.30.000000000 17-FEB-16 22.33.33.000000000
17-FEB-16 22.33.33.000000000 17-FEB-16 22.33.36.000000000
17-FEB-16 22.33.36.000000000 17-FEB-16 22.33.36.000000000
17-FEB-16 22.33.36.000000000 17-FEB-16 22.34.08.000000000
17-FEB-16 22.34.08.000000000 17-FEB-16 22.34.26.000000000
17-FEB-16 22.34.26.000000000 17-FEB-16 22.34.27.000000000
17-FEB-16 22.34.27.000000000 17-FEB-16 22.34.29.000000000
There is an overhead however in setting up a connection to the database in each thread and I'm pretty sure I should just be able to make a new cursor for each thread and share the connection as seen here:
http://www.oracle.com/technetwork/articles/vasiliev-python-concurrency-087536.html
What happens when I share the connection and use a separate cursor however is that the queries all start at the same time and then end at the same time so it looks like while the threads are being spawned, on the database the query is still running sequentially. Timings for queries:
START_TIME END_TIME
17-FEB-16 22.36.32.000000000 17-FEB-16 22.38.21.000000000
17-FEB-16 22.36.32.000000000 17-FEB-16 22.38.21.000000000
17-FEB-16 22.36.32.000000000 17-FEB-16 22.38.21.000000000
17-FEB-16 22.36.31.000000000 17-FEB-16 22.38.21.000000000
17-FEB-16 22.36.31.000000000 17-FEB-16 22.38.21.000000000
17-FEB-16 22.36.31.000000000 17-FEB-16 22.38.21.000000000
17-FEB-16 22.36.31.000000000 17-FEB-16 22.38.21.000000000
Code for multi connections:
for file in file_transporter.complete_file_list:
#Get database and open connection
the_db = shared_lib_wrapper.get_oracle().Oracle(the_logger)
the_db .connect(conn_str())
#Create new thread
thread = threading.Thread(target=Loader, args=(params, the_date, the_logger, the_db, file, file_transporter.complete_file_list[file]))
the_logger.info("Running Thread: " + thread.getName())
thread.start()
Code for multi cursors (within runLoad there is a function that creates a new cursor and executes a procedure - see below):
for file in self.file_list:
file_parametes = self.file_list[file]
function_to_run = file_parametes['LOAD_PACKAGE'] + '.' + file_parametes['LOAD_FUNCTION']
#Create new thread
thread = threading.Thread(target=self.runLoad, args=(file_parametes['RUN_ID'], function_to_run))
self.log.info("Spawned Thread: " + thread.getName())
self.log.info("Running Thread: " + thread.getName())
thread.start()
Code that creates cursor:
def execute_stored_proc_with_in_and_out_params(self, proc_name, params, dbms_logging=False):
try:
cursor = cx_Oracle.Cursor(self.db_conn
My questions therefore are:
1) Am I doing something wrong with creating the cursors? - If so any ideas of how to fix it, I've read that cx_oracle is threadsafety 2:
Currently 2, which means that threads may share the module and connections, but not cursors.
2) If I can't share the connection is there anything wrong with creating a new one for each thread, it still seems to give me a performance boost even with the overhead of creating each connection?
Upvotes: 4
Views: 9688
Reputation: 7096
Please see the below which is a working implementation of a program using the same connection but a separate cursor in each thread. The procedure I am calling is in the cx_Oracle test case (part of the 5.2.1 release) and is very simple so I call it multiple times (a random number in each thread) in the example. The output clearly shows that the threads do not finish at the same time.
from __future__ import print_function
import cx_Oracle
import datetime
import random
import threading
connection = cx_Oracle.Connection("cx_Oracle/dev", threaded = True)
def TestThread(threadNum):
startTime = datetime.datetime.today()
cursor = connection.cursor()
numInputs = int(random.random() * 5000)
print("Thread", threadNum, "with", numInputs, "inputs:", startTime)
for i in range(numInputs):
value = bool(int(random.random() * 2))
cursor.callfunc("pkg_TestBooleans.GetStringRep", str, (value,))
endTime = datetime.datetime.today()
print("Thread", threadNum, "with", numInputs, "inputs:", endTime)
threads = []
for i in range(8):
thread = threading.Thread(target = TestThread, args = (i + 1,))
threads.append(thread)
thread.start()
print("All threads spawned...waiting for them to complete...")
for thread in threads:
thread.join()
Output is as follows:
Thread 1 with 3405 inputs: 2016-02-22 07:55:07.849127
Thread 2 with 2706 inputs: 2016-02-22 07:55:07.849998
Thread 3 with 4101 inputs: 2016-02-22 07:55:07.850256
Thread 4 with 2912 inputs: 2016-02-22 07:55:07.850937
Thread 5 with 3747 inputs: 2016-02-22 07:55:07.851275
Thread 6 with 4318 inputs: 2016-02-22 07:55:07.851534
Thread 7 with 1453 inputs: 2016-02-22 07:55:07.852649
Thread 8 with 3304 inputs: 2016-02-22 07:55:07.853090
All threads spawned...waiting for them to complete...
Thread 7 with 1453 inputs: 2016-02-22 07:55:09.897217
Thread 2 with 2706 inputs: 2016-02-22 07:55:11.446744
Thread 4 with 2912 inputs: 2016-02-22 07:55:11.681414
Thread 8 with 3304 inputs: 2016-02-22 07:55:12.016809
Thread 1 with 3405 inputs: 2016-02-22 07:55:12.081846
Thread 5 with 3747 inputs: 2016-02-22 07:55:12.266111
Thread 3 with 4101 inputs: 2016-02-22 07:55:12.375623
Thread 6 with 4318 inputs: 2016-02-22 07:55:12.409352
UPDATE: as noted in the comments, a connection only performs one activity at a time. So even though using multiple cursors from the same connection in multiple threads is thread safe, it doesn't actually improve concurrency. To do that you must use a separate connection.
Upvotes: 6
Reputation: 89685
from concurrent.futures import ThreadPoolExecutor, as_completed
import cx_Oracle
import datetime
CONN_INFO = {
'host': 'xxx.xx.xx.x',
'port': 99999,
'user': 'user_name',
'psw': 'password',
'service': 'abc.xyz.com',
}
CONN_STR = '{user}/{psw}@{host}:{port}/{service}'.format(**CONN_INFO)
# your long running query
QUERY = 'SELECT FROM * customer where date = :date'
def run(date):
conn = cx_Oracle.connect(CONN_STR, threaded=True)
cursor = conn.cursor()
cursor.execute(QUERY, {'date': date})
data = cursor.fetchall()
cursor.close()
return data
def main():
dates = [datetime.datetime.today() - datetime.timedelta(days=x) for x in range(0, 30)]
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(run, d) for d in dates]
for future in as_completed(futures):
# process your records from each thread
# process_records(future.result())
if __name__ == '__main__':
main()
Upvotes: 0