26 September 2011

Parallelization of Scheduler Jobs with Advanced Queueing


One of the useful utilities of Oracle database is Scheduler.With Scheduler, it is possible to use an Oracle database as a complex,configurable job scheduling mechanism, like cron of unix. I will not go deepinside of the capabilities of Oracle’s Scheduler.(you can search previousposts) The main reason that made me to write this post is, making the sameScheduler Job to be run at the same time, as multiple instances. That is, it isnot possible to run the same job simultaneously. You can run the Job, after the running-onefinishes. This is not something bad, asit sounds. On the contrary, it is a must in order to keep atomicity of programunits, jobs. Remember usage of semaphores. For the shared resources, a controlmechanism should exist in order prevent common resources to be usedsimultaneously at the same time. This is main concept of parallel computing.

Suppose that, running the same job more than once at thesame time is not *something* bad for your application design. Again, supposethat your jobs has parameters and you are controlling atomicity of the job byyourself. So, you want to get rid of serial execution restrictions ofScheduler. But how?
I like finding workarounds for the problems. There arealways some problems in our lives. In order to overcome the problems that arenot created by you, you should find an exit-gate. Finding an exit-gate is notas easy as in the movies. …

Let’s go…

To make a Scheduler job to be run simulataneously, youshould take advantages of Advanced Queueing(AQ). Again, this is anotherpowerful utilities of Oracle for developers. You are not forced to createtables, programs, algorithms for a simple queue. With Oracle’s AQ, you have atoolkit for queue generation. Those interfaces that exists in the toolkit, willhelp you to create complex queues.(for more info, please check Oracle’s doc.)

I simulated serial execution(only Scheduler) and parallelexecution(Sheduler + AQ) below. I think analyzing the code pieces below, willmake you to understand better.


Connected to Oracle Database 11g Enterprise EditionRelease 11.2.0.1.0
Connected as mennan

SQL> --serial execution: scheduler program-job
SQL> CREATE TABLE LOG_TABLE
2 (
3 ID NUMBER,
4 CALLER VARCHAR2(32),
5 CALL_TIMESTAMP TIMESTAMP DEFAULT SYSTIMESTAMP
6 );

Table created
SQL> CREATE OR REPLACE PROCEDUREInsertLogTable(pin_Id IN LOG_TABLE.ID%TYPE) AS
2 BEGIN
3 INSERT INTO LOG_TABLE (ID,CALLER) VALUES(pin_Id, 'SERIAL_EXECUTION_PRG');
4 COMMIT;
5 DBMS_LOCK.sleep(3);
6 END;
7 /

Procedure created
SQL> BEGIN
2 DBMS_SCHEDULER.CREATE_PROGRAM(program_name => 'SERIAL_EXECUTION_PRG',
3 program_type =>'STORED_PROCEDURE',
4 program_action =>'InsertLogTable',
5 number_of_arguments => 1,
6 enabled => FALSE,
7 comments => 'The program that simulatesserial execution of dbms_scheduler.');
8 END;
9 /

PL/SQL procedure successfully completed
SQL> BEGIN
2 DBMS_SCHEDULER.DEFINE_PROGRAM_ARGUMENT(program_name => 'SERIAL_EXECUTION_PRG',
3 argument_position => 1,
4 argument_name => 'pin_Id',
5 argument_type => 'NUMBER',
6 default_value => NULL);
7 END;
8 /

PL/SQL procedure successfully completed
SQL> BEGIN
2 DBMS_SCHEDULER.CREATE_JOB(job_name => 'SERIAL_EXECUTION_JOB',
3 program_name=> 'SERIAL_EXECUTION_PRG',
4 enabled => FALSE,
5 auto_drop => FALSE,
6 comments => 'The job that simulates serialexecution of dbms_scheduler by inserting log table.');
7 END;
8 /

PL/SQL procedure successfully completed
SQL> BEGIN
2 DBMS_SCHEDULER.enable(NAME =>'SERIAL_EXECUTION_PRG');
3 END;
4 /

PL/SQL procedure successfully completed
SQL> BEGIN
2 DBMS_SCHEDULER.enable(NAME =>'SERIAL_EXECUTION_JOB');
3 END;
4 /

PL/SQL procedure successfully completed
SQL> pause;
SQL> ----TEST:run 1
SQL> BEGIN
2 DBMS_SCHEDULER.set_job_argument_value(job_name => 'SERIAL_EXECUTION_JOB',
3 argument_position => 1,
4 argument_value => 1);
5 DBMS_SCHEDULER.run_job(job_name => 'SERIAL_EXECUTION_JOB',
6 use_current_session=> FALSE);
7 END;
8 /

PL/SQL procedure successfully completed
SQL> --run 2
SQL> BEGIN
2 DBMS_SCHEDULER.set_job_argument_value(job_name => 'SERIAL_EXECUTION_JOB',
3 argument_position => 1,
4 argument_value => 2);
5 DBMS_SCHEDULER.run_job(job_name => 'SERIAL_EXECUTION_JOB',
6 use_current_session =>FALSE);
7 END;
8 /

BEGIN
DBMS_SCHEDULER.set_job_argument_value(job_name => 'SERIAL_EXECUTION_JOB',
argument_position => 1,
argument_value => 2);
DBMS_SCHEDULER.run_job(job_name => 'SERIAL_EXECUTION_JOB',
use_current_session=> FALSE);
END;

ORA-27478:"MENNAN.SERIAL_EXECUTION_JOB" işi çalışıyor
ORA-06512: konum"SYS.DBMS_ISCHED", satır 185
ORA-06512: konum"SYS.DBMS_SCHEDULER", satır486
ORA-06512: konum satır 6


SQL> select * from log_table where id is notnull;

ID CALLER CALL_TIMESTAMP
---------- ---------------------------------------------------------------------------------
1SERIAL_EXECUTION_PRG 26/09/2011 20:26:30,689920

SQL>

---rollback
DROP TABLE LOG_TABLE;
DROP PROCEDURE InsertLogTable;
BEGIN
DBMS_SCHEDULER.drop_job(job_name => 'SERIAL_EXECUTION_JOB');
END;
/

BEGIN
DBMS_SCHEDULER.drop_program(program_name => 'SERIAL_EXECUTION_PRG');
END;
/






SQL> --parallel execution: scheduler program-job + advanced queue
SQL> CREATE TABLE LOG_TABLE
2 (
3 ID NUMBER,
4 CALLER VARCHAR2(32),
5 CALL_TIMESTAMP TIMESTAMP DEFAULT SYSTIMESTAMP
6 );

Table created
SQL> CREATE OR REPLACE TYPE LOG_TYPE IS OBJECT
2 (
3 ID NUMBER
4 );
5 /

Type created
SQL> BEGIN
2 DBMS_AQADM.create_queue_table(queue_table => 'AQ_LOG_QUEUE_TABLE',
3 queue_payload_type => 'LOG_TYPE',
4 multiple_consumers => TRUE);
5 END;
6 /

PL/SQL procedure successfully completed
SQL> BEGIN
2 DBMS_AQADM.create_queue(queue_name => 'AQ_LOG_QUEUE',
3 queue_table =>'AQ_LOG_QUEUE_TABLE');
4 END;
5 /

PL/SQL procedure successfully completed
SQL> BEGIN
2 DBMS_AQADM.start_queue(queue_name =>'AQ_LOG_QUEUE',
3 enqueue => TRUE,
4 dequeue => TRUE);
5 END;
6 /

PL/SQL procedure successfully completed
SQL> --select * from all_queue_tables wherequeue_table = 'AQ_TEST_QUEUE_TABLE';
SQL> --select * from all_queues where NAME ='AQ_TEST_QUEUE';
SQL> CREATE OR REPLACE PROCEDUREInsertLogTable2(pit_LogType IN LOG_TYPE) AS
2 BEGIN
3 INSERT INTO LOG_TABLE (ID,CALLER) VALUES(pit_LogType.ID, 'PARALLEL_EXECUTION_PRG');
4 COMMIT;
5 DBMS_LOCK.sleep(3);
6 END;
7 /

Procedure created
SQL> BEGIN
2 DBMS_SCHEDULER.create_program(program_name => 'PARALLEL_EXECUTION_PRG',
3 program_type =>'STORED_PROCEDURE',
4 program_action =>'InsertLogTable2',
5 number_of_arguments => 1);
6 DBMS_SCHEDULER.define_metadata_argument(program_name => 'PARALLEL_EXECUTION_PRG',
7 argument_position => 1,
8 metadata_attribute=> 'EVENT_MESSAGE');
9 DBMS_SCHEDULER.enable('PARALLEL_EXECUTION_PRG');
10 END;
11 /

PL/SQL procedure successfully completed
SQL> BEGIN
2 DBMS_SCHEDULER.create_job(job_name => 'PARALLEL_EXECUTION_JOB',
3 program_name => 'PARALLEL_EXECUTION_PRG',
4 event_condition=> 'TAB.USER_DATA.ID IS NOT NULL',
5 queue_spec => 'AQ_LOG_QUEUE',
6 auto_drop => FALSE);
7 DBMS_SCHEDULER.enable('PARALLEL_EXECUTION_JOB');
8 END;
9 /

PL/SQL procedure successfully completed
SQL> BEGIN
2 DBMS_SCHEDULER.set_attribute(NAME => 'PARALLEL_EXECUTION_JOB',
3 ATTRIBUTE=> 'parallel_instances',
4 VALUE => TRUE);
5 END;
6 /

PL/SQL procedure successfully completed
SQL> --select * from all_scheduler_job_log where job_name = 'PARALLEL_EXECUTION_JOB'order by 1 desc;
SQL> --select * fromall_scheduler_job_run_details wherejob_name = 'PARALLEL_EXECUTION_JOB' order by 1 desc;
SQL> -- TEST : run 1
SQL> DECLARE
2 vt_EnqueueOptions DBMS_AQ.ENQUEUE_OPTIONS_T;
3 vt_MessagePropertiesDBMS_AQ.MESSAGE_PROPERTIES_T;
4 vt_RequestObject LOG_TYPE;
5 vr_MessageId RAW(16);
6 BEGIN
7 vt_RequestObject := LOG_TYPE(123);
8
9 DBMS_AQ.enqueue(queue_name => 'AQ_LOG_QUEUE',
10 enqueue_options => vt_EnqueueOptions,
11 message_properties =>vt_MessageProperties,
12 payload => vt_RequestObject,
13 msgid => vr_MessageId);
14 dbms_output.put_line('vr_MessageId : ' ||vr_MessageId);
15 COMMIT;
16 END;
17 /

PL/SQL procedure successfully completed
SQL> -- run 2
SQL> DECLARE
2 vt_EnqueueOptions DBMS_AQ.ENQUEUE_OPTIONS_T;
3 vt_MessagePropertiesDBMS_AQ.MESSAGE_PROPERTIES_T;
4 vt_RequestObject LOG_TYPE;
5 vr_MessageId RAW(16);
6 BEGIN
7 vt_RequestObject := LOG_TYPE(124);
8
9 DBMS_AQ.enqueue(queue_name => 'AQ_LOG_QUEUE',
10 enqueue_options => vt_EnqueueOptions,
11 message_properties =>vt_MessageProperties,
12 payload => vt_RequestObject,
13 msgid => vr_MessageId);
14 dbms_output.put_line('vr_MessageId : ' ||vr_MessageId);
15 COMMIT;
16 END;
17 /

PL/SQL procedure successfully completed
SQL> select * from LOG_TABLE;

IDCALLER CALL_TIMESTAMP
---------- ---------------------------------------------------------------------------------
123PARALLEL_EXECUTION_PRG 26/09/2011 20:42:34,078989
124PARALLEL_EXECUTION_PRG 26/09/2011 20:42:34,080255

SQL>
SQL>

--rollback
BEGIN
dbms_scheduler.drop_job('PARALLEL_EXECUTION_JOB');
dbms_scheduler.drop_program('PARALLEL_EXECUTION_PRG');
dbms_aqadm.stop_queue('AQ_LOG_QUEUE');
dbms_aqadm.drop_queue('AQ_LOG_QUEUE');
dbms_aqadm.drop_queue_table('AQ_LOG_QUEUE_TABLE');
END;
/
DROP TABLE LOG_TABLE;
DROP TYPE LOG_TYPE;
DROP PROCEDURE InsertLogTable2;

No comments: