Reputation: 7859
In a legacy system there is some PL/SQL procedure that calls the another procedure mutliple times with different parameters. The procedure contains a lot of PL/SQL logic (if, then, else).
As the execution of this procedure takes very long, we thought about using concurrency to speed things up without even touching the actual logic.
I understand that there are several ways of running (PL/)SQL in parallel on oracle (see bellow).
However, I wasn't able to find a way to pass different arguments/parameters to a PL/SQL procedure, execute them in parallel and wait until all procedures are finished executing (i.e. I'm looking for mechanism to join all threads or for a barrier mechanism in oracle).
Let's use the following simplified example on the SCOTT Schema:
DECLARE
PROCEDURE DELETE_BONUS(
in_job IN VARCHAR2)
IS
BEGIN
-- Imagine a lot of IF, ELSEIF, ELSE statements here
DELETE FROM BONUS WHERE JOB=in_job;
END;
BEGIN
INSERT into BONUS(ENAME, JOB) SELECT ROWNUM, 'A' FROM DUAL CONNECT BY LEVEL <= 1000000;
INSERT into BONUS(ENAME, JOB) SELECT ROWNUM, 'B' FROM DUAL CONNECT BY LEVEL <= 1000000;
INSERT into BONUS(ENAME, JOB) SELECT ROWNUM, 'C' FROM DUAL CONNECT BY LEVEL <= 1000000;
-- TODO execute those in parallel
DELETE_BONUS('A');
DELETE_BONUS('B');
DELETE_BONUS('C');
-- TODO wait for all procedures to finish
EXCEPTION
WHEN OTHERS THEN
RAISE;
END;
/
Here's what I found so far:
Can one of these approaches be used to fork and join the procedure calls? Or is there yet another approach that can?
Upvotes: 4
Views: 14657
Reputation: 15473
Just wanted to add a few notes about DBMS_PARALLEL_EXECUTE package from Oracle.
This can be used to do more than update a table, although many of the examples show this simple use case.
The trick is to use an anonymous block instead of a DML statement, and the rest of the examples are still relevant. So, instead of this:
l_sql_stmt := 'update EMPLOYEES e
SET e.salary = e.salary + 10
WHERE manager_id between :start_id and :end_id';
We might have this:
l_sql_stmt := 'BEGIN my_package.some_procedure(:start_id, :end_id); END;';
The rest of the example can be found in the "Chunk by User-Provided SQL" example section
You will still need to tell Oracle the start/end ids for each process(using CREATE_CHUNKS_BY_SQL), I typically store them in a separate lookup table (if pre-defined) or you can provide a SQL query that returns a set of start/end values. For the latter approach, try using NTILE. For example, using 8 chunks:
select min(id) as start_id, max(id) as end_id
from (
select id, ntile(8) over (order by 1) bucket
from some_table
where some_clause...
)
group by bucket
order by bucket;
Hope that helps
Upvotes: 3
Reputation: 7859
I solved the problem using DBMS_SCHEDULER and PIPEs for synchronization/IPC that does not rely on polling and does not need additional tables. It still wakes once per finished job, though.
It's quite some effort, so if some can propose a simpler solution please share it!
--
-- Define stored procedures to be executed by job
--
/** Actual method that should be run in parallel*/
CREATE OR REPLACE PROCEDURE PROC_DELETE_TEST_BONUS(
in_job IN VARCHAR2)
IS
BEGIN
-- Imagine a lot of IF, ELSEIF, ELSE statements here
DELETE FROM TEST_BONUS WHERE JOB=in_job;
END;
/
/** Stored procedure to be run from the job: Uses pipes for job synchronization, executes PROC_DELETE_TEST_BONUS. */
CREATE OR REPLACE PROCEDURE PROC_DELETE_TEST_BONUS_CONCUR(in_pipe_name IN VARCHAR2,
in_job IN VARCHAR2)
IS
flag INTEGER;
BEGIN
-- Execute actual procedure
PROC_DELETE_TEST_BONUS(in_job);
-- Signal completion
-- Use the procedure to put a message in the local buffer.
DBMS_PIPE.PACK_MESSAGE(SYSDATE ||': Success ' ||in_job);
-- Send message, success is a zero return value.
flag := DBMS_PIPE.SEND_MESSAGE(in_pipe_name);
EXCEPTION
WHEN OTHERS THEN
-- Signal completion
-- Use the procedure to put a message in the local buffer.
DBMS_PIPE.PACK_MESSAGE(SYSDATE ||':Failed ' || in_job);
-- Send message, success is a zero return value.
flag := DBMS_PIPE.SEND_MESSAGE(in_pipe_name);
RAISE;
END;
/
--
-- Run Jobs
--
DECLARE
timestart NUMBER;
duration_insert NUMBER;
jobs_amount NUMBER := 0;
retval INTEGER;
message VARCHAR2(4000);
rows_amount NUMBER;
/** Create and define a program that calls PROG_DELETE_TEST_BONUS_CONCUR to be run as job. */
PROCEDURE create_prog_delete_test_bonus
IS
BEGIN
-- define new in each run in order to ease development. TODO Once it works, no need to redefine for each run!
dbms_scheduler.drop_program(program_name => 'PROG_DELETE_TEST_BONUS_CONCUR', force=> TRUE);
dbms_scheduler.create_program ( program_name => 'PROG_DELETE_TEST_BONUS_CONCUR', program_action =>
'PROC_DELETE_TEST_BONUS_CONCUR', program_type => 'STORED_PROCEDURE', number_of_arguments => 2,
enabled => FALSE );
dbms_scheduler.DEFINE_PROGRAM_ARGUMENT( program_name => 'PROG_DELETE_TEST_BONUS_CONCUR',
argument_position => 1, argument_name => 'in_pipe_name', argument_type => 'VARCHAR2');
dbms_scheduler.DEFINE_PROGRAM_ARGUMENT( program_name=>'PROG_DELETE_TEST_BONUS_CONCUR',
argument_position => 2, argument_name => 'in_job', argument_type => 'VARCHAR2');
dbms_scheduler.enable('PROG_DELETE_TEST_BONUS_CONCUR');
END;
/** "Forks" a job that runs PROG_DELETE_TEST_BONUS_CONCUR */
PROCEDURE RUN_TEST_BONUS_JOB(
in_pipe_name IN VARCHAR2,
in_job IN VARCHAR2,
io_job_amount IN OUT NUMBER)
IS
jobname VARCHAR2(100);
BEGIN
jobname:=DBMS_SCHEDULER.GENERATE_JOB_NAME;
dbms_scheduler.create_job(job_name => jobname, program_name =>
'PROG_DELETE_TEST_BONUS_CONCUR');
dbms_scheduler.set_job_argument_value(job_name => jobname, argument_name =>
'in_pipe_name' , argument_value => in_pipe_name);
dbms_scheduler.set_job_argument_value(job_name => jobname, argument_name =>
'in_job' , argument_value => in_job);
dbms_output.put_line(SYSDATE || ': Running job: '|| jobname);
dbms_scheduler.RUN_JOB(jobname, false );
io_job_amount:= io_job_amount+1;
END;
-- Anonymous "Main" block
BEGIN
create_prog_delete_test_bonus;
-- Define private pipe
retval := DBMS_PIPE.CREATE_PIPE(DBMS_PIPE.UNIQUE_SESSION_NAME, 100, FALSE);
dbms_output.put_line(SYSDATE || ': Created pipe: ''' || DBMS_PIPE.UNIQUE_SESSION_NAME || ''' returned ' ||retval);
timestart := dbms_utility.get_time();
INSERT into TEST_BONUS(ENAME, JOB) SELECT ROWNUM, 'A' FROM DUAL CONNECT BY LEVEL <= 1000000;
INSERT into TEST_BONUS(ENAME, JOB) SELECT ROWNUM, 'B' FROM DUAL CONNECT BY LEVEL <= 1000000;
INSERT into TEST_BONUS(ENAME, JOB) SELECT ROWNUM, 'C' FROM DUAL CONNECT BY LEVEL <= 1000000;
COMMIT;
duration_insert := dbms_utility.get_time() - timestart;
dbms_output.put_line(SYSDATE || ': Duration (1/100s): INSERT=' || duration_insert);
SELECT COUNT(*) INTO rows_amount FROM TEST_BONUS;
dbms_output.put_line(SYSDATE || ': COUNT(*) FROM TEST_BONUS: ' || rows_amount);
timestart := dbms_utility.get_time();
-- -- Process sequentially
-- PROC_DELETE_TEST_BONUS('A');
-- PROC_DELETE_TEST_BONUS('B');
-- PROC_DELETE_TEST_BONUS('C');
-- start concurrent processing
RUN_TEST_BONUS_JOB(DBMS_PIPE.UNIQUE_SESSION_NAME, 'A', jobs_amount);
RUN_TEST_BONUS_JOB(DBMS_PIPE.UNIQUE_SESSION_NAME, 'B', jobs_amount);
RUN_TEST_BONUS_JOB(DBMS_PIPE.UNIQUE_SESSION_NAME, 'C', jobs_amount);
-- "Barrier": Wait for all jobs to finish
for i in 1 .. jobs_amount loop
-- Reset the local buffer.
DBMS_PIPE.RESET_BUFFER;
-- Wait and receive message. Timeout after an hour.
retval := SYS.DBMS_PIPE.RECEIVE_MESSAGE(SYS.DBMS_PIPE.UNIQUE_SESSION_NAME, 3600);
-- Handle errors: timeout, etc.
IF retval != 0 THEN
raise_application_error(-20000, 'Error: '||to_char(retval)||' receiving on pipe. See Job Log in table user_scheduler_job_run_details');
END IF;
-- Read message from local buffer.
DBMS_PIPE.UNPACK_MESSAGE(message);
dbms_output.put_line(SYSDATE || ': Received message on '''|| DBMS_PIPE.UNIQUE_SESSION_NAME ||''' (Status='|| retval ||'): ' || message);
end loop;
dbms_output.put(SYSDATE || ': Duration (1/100s): DELETE=');
dbms_output.put_line(dbms_utility.get_time() - timestart);
SELECT COUNT(*) INTO rows_amount FROM TEST_BONUS;
dbms_output.put_line(SYSDATE || ': COUNT(*) FROM TEST_BONUS: ' || rows_amount);
retval :=DBMS_PIPE.REMOVE_PIPE(DBMS_PIPE.UNIQUE_SESSION_NAME);
dbms_output.put_line(systimestamp || ': REMOVE_PIPE: ''' || DBMS_PIPE.UNIQUE_SESSION_NAME || ''' returned: ' ||retval);
EXCEPTION
WHEN OTHERS THEN
dbms_output.put_line(SYSDATE || SUBSTR(SQLERRM, 1, 1000) || ' ' ||
SUBSTR(DBMS_UTILITY.FORMAT_ERROR_BACKTRACE, 1, 1000));
retval := DBMS_PIPE.REMOVE_PIPE(DBMS_PIPE.UNIQUE_SESSION_NAME);
dbms_output.put_line(SYSDATE || ': REMOVE_PIPE: ''' || DBMS_PIPE.UNIQUE_SESSION_NAME || ''' returned: ' ||retval);
-- Clean up in case of error
PROC_DELETE_TEST_BONUS('A');
PROC_DELETE_TEST_BONUS('B');
PROC_DELETE_TEST_BONUS('C');
RAISE;
END;
/
You should always keep in mind that the changes executed within the job are committed in a separate transaction.
Just to get a feeling for what this concurrency achieves, here a some averaged measured values: The sequential code in the question takes about 60s to complete, the parallel one about 40s.
It would be an interesting further investigation how this turns out when there are more than the three jobs running in parallel.
PS
A helpful query to find out about the status of the jobs is the following
SELECT job_name,
destination,
TO_CHAR(actual_start_date) AS actual_start_date,
run_duration,
TO_CHAR((ACTUAL_START_DATE+run_duration)) AS actual_end_date,
status,
error#,
ADDITIONAL_INFO
FROM user_scheduler_job_run_details
ORDER BY actual_start_date desc;
Upvotes: 3
Reputation: 5565
If yes, you can try this:
Upvotes: 0