Aychin's Oracle RDBMS Blog

Only for Advanced level Professionals

Monthly Archives: April 2010

Using events with DBMS_SCHEDULER (example of DDL auditing)


Starting from version 10g Oracle presents DBMS_SCHEDULER package to create and manage jobs. There is also new objects like a windows and group of them, schedules, programs,  job classes and chains. Now jobs can be event based. Your application can detect some event and then signal to the scheduler to run appropriate job. Scheduler subscribes to the queue and when signal received which will met the specified condition job will run. It can be system queues or user defined queues. Examples will help You to understand this concept better.

For example, we need to log all DDL operations in our database excluding system generated DDLs. We will create system level after ddl trigger to capture DDLs, this trigger will enqueue the information about the session and action, which executed the DDL, to the queue. Scheduler will be subscribed to this queue and when signal will be received it will run the job, this job will filter non system sessions and log the information to the table.

You can ask why we need so complex structure if we can just do it in the trigger itself. Because when we put this code in the trigger itself and make here some calculations our ddl action must wait for this calculations to complete, also if there will be some problems with code, it will affect the ddl causing internal errors. Using queue and event based jobs is more flexible and we will separate our code from the trigger, trigger will just enqueue captured information and control will be released back to the session with no delay. Scheduler will receive the signal and all calculations will be done on the job side without any impact on the users session.

Note, this code must be used only for testing and learning. It is not tested in production environment, and not guaranty capturing 100% of DDLs because of some restrictions of scheduler engine in 10g. It is described in the problems section below.

Ok, Lets create the table, this table will be used for logging:

create table sys.ddllog
    (
     ddl_timestamp date,	  --Time stamp of the ddl
     sysevent varchar2(100),      --DDL command, like "CREATE" or "DROP"
     login_user varchar2(50),     --Login username
     instance_num number,         --Instance number
     database_name varchar2(50),  --Database Name
     dict_obj_name varchar2(100), --Dictionary object name on which DDL was performaed
     dict_obj_type varchar2(100), --Type of this object
     dict_obj_owner varchar2(50), --Owner of this object
     host varchar2(100),          --Host from which connection comes
     ip varchar2(15),             --IP address
     os_user varchar2(50),        --OS username
     obj_current_ddl clob         --This field will store current DDL of the object
    );

Now we need queue, to create functional queue, we must create queue table first. Queue table is based on the object type, called payload type, it will be used for enqueue and dequeue signals to/from queue.

This statement will create object type “sys.after_ddl_queue_type” that will be used as payload type for the queue table:

create or replace type sys.after_ddl_queue_type as object
    (
     ddl_timestamp date,
     sysevent varchar2(100),
     ora_login_user varchar2(50),
     ora_instance_num number,
     ora_database_name varchar2(50),
     ora_dict_obj_name varchar2(100),
     ora_dict_obj_type varchar2(100),
     ora_dict_obj_owner varchar2(50),
     host varchar2(100),
     ip varchar2(15),
     os_user varchar2(50),
     ddl_text clob
    );
/

Now lets create our queue table:

begin
 dbms_aqadm.create_queue_table(
     queue_table=>'SYS.AFTER_DDL_QUEUE_TABLE',      --Queue table name, will be created in SYS schema
     queue_payload_type=>'SYS.AFTER_DDL_QUEUE_TYPE',
     multiple_consumers=>TRUE);                     --This parameter allows multiple consumers to subscribe to the related queue
end;
/

multiple_consumers parameter must be set to true to use this queue with scheduler. Now our queue table is created, You can use the “desc” sqlplus command to list all columns of this table, You will see  user_data field of type after_ddl_queue_type. Now we can create queue, lets name it “after_ddl_queue”:

begin
 dbms_aqadm.create_queue(
     queue_name=>'SYS.AFTER_DDL_QUEUE',
     queue_table=>'SYS.AFTER_DDL_QUEUE_TABLE',
     comment=>'Queue for the sys.after_ddl trigger, used for the job sys.after_ddl_job');
end;
/

Queue created, but it is not enabled for enqueue or dequeue by default:

select enqueue_enabled, dequeue_enabled from dba_queues where name='AFTER_DDL_QUEUE';

enqueue_enabled dequeue_enabled
--------------- ---------------
             NO              NO

We need to enable it manually:

begin
 dbms_aqadm.start_queue(queue_name=>'SYS.AFTER_DDL_QUEUE', enqueue=>TRUE, dequeue=>TRUE);
end;
/

Now our queue is ready.

Scheduler will listen to this queue and when message will be received it will run the job. We need to think about how to pass this message to the jobs program handler as argument. There is special scheduler metadata attribute called event_message, this attribute is for event-based jobs and contains message content of the event that started the job. Using this scheduler argument we will pass message content from the queue to our program.  At this step we will create stored procedure “sys.after_ddl_proc”, we will use this procedure in the scheduler program, this procedure will have one input variable message of type sys.after_ddl_queue_type, this procedure will handle the message passed through event_message metadata attribute.

create or replace procedure sys.after_ddl_proc (message in sys.after_ddl_queue_type)
as
begin
 if message.ora_login_user != 'SYS' then      --Check, if the user is not SYS, then proceed
   insert into sys.ddllog values
    (message.ddl_timestamp,
     message.sysevent,
     message.ora_login_user,
     message.ora_instance_num,
     message.ora_database_name,
     message.ora_dict_obj_name,
     message.ora_dict_obj_type,
     message.ora_dict_obj_owner,
     message.host,
     message.ip,
     message.os_user,
     message.ddl_text);
   commit write batch nowait;
 end if;
end;
/

As You can see, we check the ora_login_user attribute  of the message, this attribute contains the username of the session executed DDL, if it is not SYS user then we insert all attributes to our log table sys.ddllog. After that we commit, I used here “commit write batch nowait” to minimize as much as possible commit time, the reason is related to the limitations of the scheduler in version 10g, I will explain this limitation at the end of this post.

Now lets create scheduler program

begin
 dbms_scheduler.create_program (program_name=>'SYS.AFTER_DDL_PROGRAM',
                                program_type=>'STORED_PROCEDURE',
                                program_action=>'SYS.AFTER_DDL_PROC',
                                number_of_arguments=>1);

 dbms_scheduler.define_metadata_argument (program_name=>'SYS.AFTER_DDL_PROGRAM',
                                          argument_position=>1,
                                          metadata_attribute=>'EVENT_MESSAGE');
end;
/

We created scheduler program SYS.AFTER_DDL_PROGRAM, program type is STORED_PROCEDURE, action is our AFTER_DDL_PROC and we have 1 argument (message). Then we define scheduler metadata argument event_message for our program. Programs by default disabled, to enable the program use dbms_scheduler.enable procedure:

begin
 dbms_scheduler.enable('SYS.AFTER_DDL_PROGRAM');
end;
/

Now we will create job class, assign resource consumer group, logging level and logs retention period to this job class. Sure it is not mandatory to use job class in our case, but it is the best practices. Also You can use this job class for any other administrative jobs in Your system.

begin
 dbms_scheduler.create_job_class(job_class_name=>'ADMIN_JOBS',
                                 resource_consumer_group=>'SYS_GROUP',
                                 logging_level=>DBMS_SCHEDULER.LOGGING_FULL,
                                 log_history=>90);          --Retention period of the logs will be 90 days
end;
/

At this step we will create job, this job will be the event-based, that is why we will use two main parameters event_condition and queue_spec,  oracle documentation will best describe this two parameters:

event_condition This is a conditional expression based on the columns of the event source queue table. The expression must have the syntax of an Advanced Queuing rule. Accordingly, you can include user data properties in the expression provided that the message payload is an object type, and that you prefix object attributes in the expression with tab.user_data. For more information on rules, see theDBMS_AQADM.ADD_SUBSCRIBER procedure.
queue_spec This argument specifies the queue into which events that start this particular job will be enqueued (the source queue). If the source queue is a secure queue, the queue_spec argument is a string containing a pair of values of the form queue_name, agent name. For non-secure queues, only the queue name need be provided. If a fully qualified queue name is not provided, the queue is assumed to be in the job owner’s schema. In the case of secure queues, the agent name provided should belong to a valid agent that is currently subscribed to the queue.

queue_spec in our case will be SYS.AFTER_DDL_QUEUE, it is not secure queue, that is why we don't use agent name. The event_condition will specify at which condition scheduler must run the job. To run the job for each message received from the queue I will use condition "tab.user_data.ddl_timestamp is not null", because every entry in the queue will have ddl_timestamp attribute set, we will run the job for each  message from the queue.

begin
 dbms_scheduler.create_job(job_name=>'SYS.AFTER_DDL_JOB',
                           job_class=>'SYS.ADMIN_JOBS',
                           program_name=>'SYS.AFTER_DDL_PROGRAM',
                           start_date=>systimestamp,
                           event_condition=>'tab.user_data.ddl_timestamp is not null',
                           queue_spec=>'SYS.AFTER_DDL_QUEUE',
                           auto_drop=>false);
end;
/

Jobs as default created disabled, lets enable it

begin
 dbms_scheduler.enable('SYS.AFTER_DDL_JOB');
end;
/

Now we have all components ready, the last step will be creating the system level after ddl trigger, this trigger will enqueue all captured information to the SYS.AFTER_DDL_QUEUE:

create or replace trigger sys.after_ddl after ddl on database
declare
 ddl_text_var dbms_standard.ora_name_list_t;       --This is the type ora_name_list_t, is table of varchar2(64)
 full_ddl_text clob;                               --There will be stored the full DDL text
 enqueue_options dbms_aq.enqueue_options_t;        --Enqueue Options Type
 message_properties dbms_aq.message_properties_t;  --Message Properties Type
 message sys.after_ddl_queue_type;
 msgid raw(16);                                    --Message ID, it will be returned from the dbms_aq.enqueue procedure
begin
 for i in 1..ora_sql_txt(ddl_text_var) loop        --This portion of code calculates the full DDL text, because ddl_text_var
   full_ddl_text:=full_ddl_text||ddl_text_var(i);  --is just a table of 64 byte pieces of DDL, we need to subtract them
 end loop;                                         --to get full DDL.
 message:=after_ddl_queue_type(sysdate,
                               ora_sysevent,
                               ora_login_user,
                               ora_instance_num,
                               ora_database_name,
                               ora_dict_obj_name,
                               ora_dict_obj_type,
                               ora_dict_obj_owner,
                               sys_context('userenv','HOST'),
                               sys_context('userenv','IP_ADDRESS'),
                               sys_context('userenv','OS_USER'),
                               full_ddl_text);
 if ora_sysevent='DROP' then
   message_properties.delay:=1;
 end if;
 dbms_aq.enqueue(queue_name=>'SYS.AFTER_DDL_QUEUE',
                 enqueue_options=>enqueue_options,
                 message_properties=>message_properties,
                 payload=> message,
                 msgid=>msgid);
end;
/

We use defaults here for enqueue options. Also we use system events attribute functions: ora_sysevent, ora_login_user, ora_instance_num, ora_database_name, ora_dict_obj_name, ora_dict_obj_type, ora_dict_obj_owner, ora_sql_txt. Host, ip address and os user information we get from system context userenv namespace.

First of all lets look at ddl_text_var, I defined this variable of type dbms_standard.ora_name_list_t, the description of this type is:

type ora_name_list_t is table of varchar2(64);

as You can see, it is just a table of 64 byte length strings. The system attribute function ora_sql_txt is just a synonym for the function sys.sql_txt, in his turn this function calls the dbms_standard.sql_txt function, there is the description of this function:

function sql_txt (sql_text out ora_name_list_t) return binary_integer;

it have one output argument sql_text of ora_name_list_t type and returns binary_integer, this binary_integer represents the number of 64 byte pieces of DDL text. That is why we are looping from 1 to the number of pieces and subtract this pieces in the loop to get full DDL text.

There is one more interesting point, I set “delay” property for DROP event messages. I will try to explain. Because when we execute “drop table <table name>;” statement, there will be two messages generated in the queue, first one will be ALTER event for that table, and second one will be DROP system event for the same table. It is because of Recycle Bin functionality. When we issue DROP statement, Oracle in first step inserts into sys.RecycleBin$ table specific information, then executes “alter table <table name> rename to BIN$mce_markerlt;sys_specific_name>;”. That is why our trigger will be triggered twice. Exception will be, when initialization parameter recyclebin is set to “off” or we will use “drop table <table name> purge;” statement, to avoid Recycle Bin. The reason of setting “delay” property to 1 second, is to give time to the JOB to handle first ALTER message. The reason of why JOB needs this time described in the Problems section of this post.

Now when some user will execute DDL, trigger SYS.AFTER_DDL will enqueue all information about this action to the SYS.AFTER_DDL_QUEUE and return control to the user. Scheduler will dequeue this message from the queue and pass it to the SYS.AFTER_DDL_JOB to handle and create the log record in the SYS.DDLLOG table.

Problems

This is Note from the oracle official documentation:

Note: The Scheduler runs the event-based job for each occurrence of an event that matches event_condition. However, events that occur while the job is already running are ignored; the event gets consumed, but does not trigger another run of the job. Beginning in Oracle Database 11g Release 1, you can change this default behavior by setting the job attribute PARALLEL_INSTANCES to TRUE. In this case, an instance of the job is started for every instance of the event, and all job instances are lightweight jobs. See the SET_ATTRIBUTE procedure in Oracle Database PL/SQL Packages and Types Reference for details.

It means that in Oracle 10g if we will enqueue many messages at a time, the job will no have time to handle all this messages, in other words if somebody will execute many DDLs at a time, for example using a script, some of this DDLs will not be handled by the Job. That is why I used “commit write batch nowait” in the handler procedure, to minimize execution time and maximize the count of handled messages. Sure using some programming techniques we can avoid this in 10g too, but for this example it will be extra coding.  But if You use Oracle 11g You can set PARALLEL_INSTANCES attribute for the job, it will solve this issue and You can erase the row from the trigger where we set “delay” message property for DROP events.

begin
 dbms_scheduler.set_attribute('SYS.AFTER_DDL_JOB','parallel_instances',TRUE);
end;
/

Testing

Ok, now our DDL’s executed by non SYS users must be captured and logged in the sys.ddllog table.

SQL> conn scott/scott
Connected.

SQL> create table testjob (col1 number, col2 varchar2(100));

Table created.

SQL> set linesize 200
SQL> set pagesize 2000
SQL> col sysevent format a10
SQL> col login_user fromat a10
SQL> col login_user format a10
SQL> col dict_obj_type format a10
SQL> col dict_obj_owner format a10
SQL> col dict_obj_name format a20
SQL> set long 10000

SQL> select ddl_timestamp, sysevent, login_user, dict_obj_type, dict_obj_owner, dict_obj_name, obj_current_ddl
  from sys.ddllog order by ddl_timestamp desc;

DDL_TIMESTAMP       SYSEVENT   LOGIN_USER DICT_OBJ_T DICT_OBJ_O DICT_OBJ_NAME        OBJ_CURRENT_DDL
------------------- ---------- ---------- ---------- ---------- -------------------- --------------------------------------------------------------------------------
01.05.2010 15:37:58 CREATE     SCOTT      TABLE      SCOTT      TESTJOB              create table testjob (col1 number, col2 varchar2(100))

1 row selected.

SQL> drop table testjob;

Table dropped.

SQL> select ddl_timestamp, sysevent, login_user, dict_obj_type, dict_obj_owner, dict_obj_name, obj_current_ddl
  from sys.ddllog order by ddl_timestamp desc;

DDL_TIMESTAMP       SYSEVENT   LOGIN_USER DICT_OBJ_T DICT_OBJ_O DICT_OBJ_NAME        OBJ_CURRENT_DDL
------------------- ---------- ---------- ---------- ---------- -------------------- --------------------------------------------------------------------------------
01.05.2010 15:41:50 ALTER      SCOTT      TABLE      SCOTT      TESTJOB              ALTER TABLE "SCOTT"."TESTJOB" RENAME TO "BIN$hcgVxJI7VKXgRAAUwmUQdQ==$0"
01.05.2010 15:41:50 DROP       SCOTT      TABLE      SCOTT      TESTJOB              drop table testjob
01.05.2010 15:37:58 CREATE     SCOTT      TABLE      SCOTT      TESTJOB              create table testjob (col1 number, col2 varchar2(100))

3 rows selected.

SQL> create user imran identified by sercret;

User created.

SQL> select ddl_timestamp, sysevent, login_user, dict_obj_type, dict_obj_owner, dict_obj_name, obj_current_ddl
  from sys.ddllog order by ddl_timestamp desc;

DDL_TIMESTAMP       SYSEVENT   LOGIN_USER DICT_OBJ_T DICT_OBJ_O DICT_OBJ_NAME        OBJ_CURRENT_DDL
------------------- ---------- ---------- ---------- ---------- -------------------- --------------------------------------------------------------------------------
01.05.2010 15:49:53 CREATE     SCOTT      USER                  IMRAN                create user imran identified by ********
01.05.2010 15:41:50 ALTER      SCOTT      TABLE      SCOTT      TESTJOB              ALTER TABLE "SCOTT"."TESTJOB" RENAME TO "BIN$hcgVxJI7VKXgRAAUwmUQdQ==$0"
01.05.2010 15:41:50 DROP       SCOTT      TABLE      SCOTT      TESTJOB              drop table testjob
01.05.2010 15:37:58 CREATE     SCOTT      TABLE      SCOTT      TESTJOB              create table testjob (col1 number, col2 varchar2(100))

4 rows selected.

SQL> DroP user IMRAN;

User dropped.

SQL> select ddl_timestamp, sysevent, login_user, dict_obj_type, dict_obj_owner, dict_obj_name, obj_current_ddl
  from sys.ddllog order by ddl_timestamp desc;

DDL_TIMESTAMP       SYSEVENT   LOGIN_USER DICT_OBJ_T DICT_OBJ_O DICT_OBJ_NAME        OBJ_CURRENT_DDL
------------------- ---------- ---------- ---------- ---------- -------------------- --------------------------------------------------------------------------------
01.05.2010 15:57:50 DROP       SCOTT      USER                  IMRAN                DroP user IMRAN
01.05.2010 15:49:53 CREATE     SCOTT      USER                  IMRAN                create user imran identified by ********
01.05.2010 15:41:50 ALTER      SCOTT      TABLE      SCOTT      TESTJOB              ALTER TABLE "SCOTT"."TESTJOB" RENAME TO "BIN$hcgVxJI7VKXgRAAUwmUQdQ==$0"
01.05.2010 15:41:50 DROP       SCOTT      TABLE      SCOTT      TESTJOB              drop table testjob
01.05.2010 15:37:58 CREATE     SCOTT      TABLE      SCOTT      TESTJOB              create table testjob (col1 number, col2 varchar2(100))

5 rows selected.

Rollback all changes

You can use this script if You will want to rollback all changes that was made to the database:

drop trigger sys.after_ddl;

exec dbms_scheduler.drop_job('SYS.AFTER_DDL_JOB');

exec dbms_scheduler.drop_job_class('ADMIN_JOBS');

exec dbms_scheduler.drop_program('SYS.AFTER_DDL_PROGRAM');

drop procedure sys.after_ddl_proc;

exec dbms_aqadm.stop_queue('SYS.AFTER_DDL_QUEUE');

exec dbms_aqadm.drop_queue('SYS.AFTER_DDL_QUEUE');

exec dbms_aqadm.drop_queue_table('SYS.AFTER_DDL_QUEUE_TABLE');

drop type sys.after_ddl_queue_type;

drop table sys.ddllog;

(c) Aychin Gasimov, 04/2010, Azerbaijan Republic