ITPub博客

首页 > Linux操作系统 > Linux操作系统 > stream

stream

原创 Linux操作系统 作者:liyijie78 时间:2011-03-12 18:52:00 0 删除 编辑

I have been playing with Oracle Streams again lately. My goal is to capture changes in 10g and send them to a 9i database.

Below is the short list for setting up Change Data Capture using Oracle Streams. These steps are mostly from the docs with a few tweaks I have added. This entry only covers setting up the local capture and apply. I'll add the propagation to 9i later this week or next weekend.

First the set up: we will use the HR account's Employee table. We'll capture all changes to the Employee table and insert them into an audit table. I'm not necessarily saying this is the way you should audit your database but it makes a nice example.

I'll also add a monitoring piece to capture process. I want to be able to see exactly what is being captured when it is being captured.

You will need to have sysdba access to follow along with me. Your database must also be in archivelog mode. The changes are picked up from the redo log.

So, away we go! The first step is to create out streams administrator. I will follow the guidelines from the oracle docs exactly for this:

Connect as sysdba:

sqlplus / as sysdba 

Create the streams tablespace (change the name and/or location to suit):

create tablespace streams_tbs datafile 'c:\temp\stream_tbs.dbf' size 25M reuse autoextend on maxsize unlimited; 

Create our streams administrator:

create user strmadmin identified by strmadmin default tablespace streams_tbs quota unlimited on streams_tbs; 

I haven't quite figured out why, but we need to grant our administrator DBA privs. I think this is a bad thing. There is probably a work around where I could do some direct grants instead but I haven't had time to track those down.

grant dba to strmadmin; 

We also want to grant streams admin privs to the user.

BEGIN DBMS_STREAMS_AUTH.GRANT_ADMIN_PRIVILEGE( grantee => 'strmadmin', grant_privileges => true); END; / 

The next steps we'll run as the HR user.

conn hr/hr 

Grant all access to the employee table to the streams admin:

grant all on hr.employees to strmadmin; 

We also need to create the employee_audit table. Note that I am adding three columns in this table that do not exist in the employee table.

CREATE TABLE employee_audit( employee_id NUMBER(6), first_name VARCHAR2(20), last_name VARCHAR2(25), email VARCHAR2(25), phone_number   VARCHAR2(20), 
  hire_date                                                      

Grant all access to the audit table to the streams admin user:

grant all on hr.employee_audit to strmadmin; 

We connect as the streams admin user:

conn strmadmin/strmadmin 

We can create a logging table. You would NOT want to do this in a high-volume production system. I am doing this to illustrate user defined monitoring and show how you can get inside the capture process.

CREATE TABLE streams_monitor ( date_and_time TIMESTAMP(6) DEFAULT systimestamp, txt_msg CLOB ); 

Here we create the queue. Unlike AQ, where you have to create a separate table, this step creates the queue and the underlying ANYDATA table.

BEGIN DBMS_STREAMS_ADM.SET_UP_QUEUE( queue_table => 'strmadmin.streams_queue_table', queue_name => 'strmadmin.streams_queue'); END; / 

This just defines that we want to capture DML and not DDL.

BEGIN DBMS_STREAMS_ADM.ADD_TABLE_RULES( table_name => 'hr.employees', streams_type => 'capture', streams_name => 'capture_emp', queue_name => 'strmadmin.streams_queue', include_dml => true, include_ddl => false, inclusion_rule => true); END; / 

Tell the capture process that we want to know who made the change:

BEGIN DBMS_CAPTURE_ADM.INCLUDE_EXTRA_ATTRIBUTE( capture_name => 'capture_emp', attribute_name => 'username', include => true); END; / 

We also need to tell Oracle where to start our capture. Change the source_database_name to match your database.

DECLARE iscn NUMBER; -- Variable to hold instantiation SCN value BEGIN iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER(); DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN( source_object_name => 'hr.employees', source_database_name => 'ORCL', instantiation_scn => iscn); END; / 

And the fun part! This is where we define our capture procedure. I'm taking this right from the docs but I'm adding a couple steps.

CREATE OR REPLACE PROCEDURE emp_dml_handler(in_any IN ANYDATA) IS lcr SYS.LCR$_ROW_RECORD; rc PLS_INTEGER; command VARCHAR2(30); old_values SYS.LCR$_ROW_LIST; BEGIN -- Access the LCR rc := in_any.GETOBJECT(lcr); -- Get the object command type command := lcr.GET_COMMAND_TYPE(); 
 -- I am inserting the XML equivalent of the LCR into the monitoring table. insert into streams_monitor (txt_msg) values (command || DBMS_STREAMS.CONVERT_LCR_TO_XML(in_any) ); 
 -- Set the command_type in the row LCR to INSERT lcr.SET_COMMAND_TYPE('INSERT'); -- Set the object_name in the row LCR to EMP_DEL lcr.SET_OBJECT_NAME('EMPLOYEE_AUDIT'); 
 -- Set the new values to the old values for update and delete IF command IN ('DELETE', 'UPDATE') THEN -- Get the old values in the row LCR old_values := lcr.GET_VALUES('old'); -- Set the old values in the row LCR to the new values in the row LCR lcr.SET_VALUES('new', old_values); -- Set the old values in the row LCR to NULL lcr.SET_VALUES('old', NULL); END IF; 
 -- Add a SYSDATE for upd_date lcr.ADD_COLUMN('new', 'UPD_DATE', ANYDATA.ConvertDate(SYSDATE)); -- Add a user column lcr.ADD_COLUMN('new', 'user_name', lcr.GET_EXTRA_ATTRIBUTE('USERNAME') ); -- Add an action column lcr.ADD_COLUMN('new', 'ACTION', ANYDATA.ConvertVarChar2(command)); 
 -- Make the changes lcr.EXECUTE(true); commit; END;/ 

Create the DML handlers:

BEGIN DBMS_APPLY_ADM.SET_DML_HANDLER( object_name => 'hr.employees', object_type => 'TABLE', operation_name => 'INSERT', error_handler => false, user_procedure => 'strmadmin.emp_dml_handler', apply_database_link => NULL, apply_name => NULL); END; / BEGIN DBMS_APPLY_ADM.SET_DML_HANDLER( object_name => 'hr.employees', object_type => 'TABLE', operation_name => 'UPDATE', error_handler => false, user_procedure => 'strmadmin.emp_dml_handler', apply_database_link => NULL, apply_name => NULL); END; / BEGIN DBMS_APPLY_ADM.SET_DML_HANDLER( object_name => 'hr.employees', object_type => 'TABLE', operation_name => 'DELETE', error_handler => false, user_procedure => 'strmadmin.emp_dml_handler', apply_database_link => NULL, apply_name => NULL); END; / 

Create the apply rule. This tells streams, yet again, that we in fact do want to capture changes. The second calls tells streams where to put the info. Change the source_database_name to match your database.

DECLARE emp_rule_name_dml VARCHAR2(30); emp_rule_name_ddl VARCHAR2(30); BEGIN DBMS_STREAMS_ADM.ADD_TABLE_RULES( table_name => 'hr.employees', streams_type => 'apply', streams_name => 'apply_emp', queue_name => 'strmadmin.streams_queue', include_dml => true, include_ddl => false, source_database => 'ORCL', dml_rule_name => emp_rule_name_dml, ddl_rule_name => emp_rule_name_ddl); DBMS_APPLY_ADM.SET_ENQUEUE_DESTINATION( rule_name => emp_rule_name_dml, destination_queue_name => 'strmadmin.streams_queue'); END; / 

NOTE: An error was noticed (by several readers) and a fix was posted on OTN so I wanted to get the fix in here:

Add this code, as STRMADMIN:

BEGIN DBMS_STREAMS_ADM.ADD_TABLE_RULES( table_name => 'hr.employees', streams_type => 'dequeue', streams_name => 'emp_deq', queue_name => 'strmadmin.streams_queue', include_dml => true, include_ddl => false, inclusion_rule => true); END; 

I haven't tested this myself but it does make sense based on the error people were receiving (ORA-26694). I did have dequeues in my original application so I must have missed that when making this example. Thanks OTN!

We don't want to stop applying changes when there is an error, so:

BEGIN DBMS_APPLY_ADM.SET_PARAMETER( apply_name => 'apply_emp', parameter => 'disable_on_error', value => 'n'); END; / 

Turn on the apply process:

BEGIN DBMS_APPLY_ADM.START_APPLY( apply_name => 'apply_emp'); END; / 

Turn on the capture process:

BEGIN DBMS_CAPTURE_ADM.START_CAPTURE( capture_name => 'capture_emp'); END; / 

Connect as HR and make some changes to Employees.

sqlplus hr/hr 
INSERT INTO hr.employees VALUES(207, 'JOHN', 'SMITH', 'JSMITH@MYCOMPANY.COM', NULL, '07-JUN-94', 'AC_ACCOUNT', 777, NULL, NULL, 110); COMMIT; UPDATE hr.employees SET salary=5999 WHERE employee_id=206; COMMIT; DELETE FROM hr.employees WHERE employee_id=207; COMMIT; 

It takes a few seconds for the data to make it to the logs and then back into the system to be appled. Run this query until you see data (remembering that it is not instantaneous):

SELECT employee_id, first_name, last_name, upd_Date, action FROM hr.employee_audit ORDER BY employee_id; 

Then you can log back into the streams admin account:

sqlplus strmadmin/strmadmin 

View the XML LCR that we inserted during the capture process:

set long 9999 set pagesize 0 select * from streams_monitor; 

That's it! It's really not that much work to capture and apply changes. Of course, it's a little bit more work to cross database instances, but it's not that much. Keep an eye out for a future entry where I do just that. One of the things that amazes me is how little code is required to accomplish this. The less code I have to write, the less code I have to maintain.

来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/11417069/viewspace-689275/,如需转载,请注明出处,否则将追究法律责任。

请登录后发表评论 登录
全部评论

注册时间:2011-01-07

  • 博文量
    93
  • 访问量
    277160