ITPub博客

首页 > Linux操作系统 > Linux操作系统 > 使用Oracle内建功能构建ETL流程

使用Oracle内建功能构建ETL流程

原创 Linux操作系统 作者:xz43 时间:2011-01-27 15:55:59 0 删除 编辑
经常听到客户抱怨一些企业级的数据仓库软件许可证过于昂贵;也曾经见到过一次性的Access到 Oracle的数据迁移就使用了DataStage(这是真的!在发现 Server版本不能连接Access后,还专门购买了DataStage for ODBC --__--!)。实际上一些简单的ETL流程完全可以使用Oracle内建的功能完成。本文的主旨在于说明这一思想,文中代码未必是最佳实践。本文中所有的代码经过了测试,但一些错误与疏忽还是难免的。如果你发现了错误,或者有不同的观点,欢迎讨论。

    概述
    假如把一个完整的数据仓库系统比作一个提供美味菜肴的餐馆,那么ETL系统就是这个餐馆的厨房。在大多数情况下,ETL对最终用户来说是透明的,正像食客往往不需要进入厨房获取服务,厨房也不会对食客开放。但是,ETL模块常常占用整个数据仓库项目70%以上的资源,实际情况也大概也是如此,厨师的工资比侍者要高出很多。

    在当前的数据仓库项目中,企业级ETL工具昂贵的许可证、培训费用常常使许多中小型用户望而却步。真实世界的应用中相当一部分的数据源(E) 都是以文件或数据库的方式提供,进行的转换 (T) 也可以在数据库之内完成,基于这一实际情况,利用Oracle自身的一些功能,我们可以构建相对简单的ETL流程。

    在Oracle9i中引入了外部表(External Table)的概念,通过访问驱动程序ORACLE_LOADER可以对数据库之外的文件进行只读的SQL查询。在Oracle10g中,提供了第二个访问驱动程序ORACLE_DATAPUMP,允许用户对外部表进行写操作,写出的结果文件以特定的结构保存并允许DPAPI访问,即该结果文件可以被加载到另一个数据库中。同时,新的访问驱动程序也支持列对象的投影。

    配合10g中新增的DBMS_SCHEDULER包,可以很方便地实现ETL工具的定时调度功能。这个新的DBMS_SCHEDULER包取代了原有的DBMS_JOB包,并提供了更加强大完善的功能,笔者在本文中仅举一简单示例说明。 

    Ralph Kimball在他的中提出了ECCD(Extract-Clean-Conform-Deliver)的架构,在此文章中笔者将使用ECCD的四个步骤进行描述:源系统通过FTP提供文件格式的数据源文件,使用ORACLE_LOADER访问驱动程序使用该文件构建外部表(抽取),与数据库中的标准数据表进行校验并写入对应的Staging表(清洗与整合),处理的结果通过ORACLE_DATAPUMP访问驱动程序写入目标文件(分发)。

    准备工作
    在使用外部表之前,首先要建立DIRECTORY对象。同时给需要进行外部表操作的用户赋予适当的权限。

SQL> CREATE OR REPLACE DIRECTORY source_dir as '/home/oracle/backup';

Directory created

SQL> CREATE OR REPLACE DIRECTORY target_dir as '/home/oracle/backup/target';

Directory created

SQL> CREATE OR REPLACE DIRECTORY log_dir as '/home/oracle/backup/log';

Directory created

SQL> grant read on directory source_dir to test;

授权成功.

SQL> grant read on directory target_dir to test;

授权成功.

SQL> grant read on directory log_dir to test;

授权成功.

SQL>grant connect, dba to test identified by test;

授权成功.

数据抽取— ORACLE_LOADER
    平文件的数据源一般有两种格式:定长格式或者分隔符格式,ORACLE_LOADER对这两种格式都提供支持。如果是分隔符格式的源文件,使用关键字FIELDS TERMINATED BY指定分隔符;如果是定长格式文件,使用关键字POSITION指定数据列的位置。

$ pwd
/home/oracle/backup
$ ls
target  log  product1.dat
$ cat product1.dat
1,Bicycle,JiangSu
2,Camps,ZheJiang
3,Wearings,SiChuan
4,Gloves,SiChuan
5,Food,YunNan
6,Shoes,NULL
$

使用EXTERNAL ORGANIZATIONAL关键字构建外部表,注意DRIVER关键字指定了ORACLE_LOADER驱动;DEFAULT DIRECTORY子句指定了默认的操作目录;在ACCESS PARAMETERS子句中定义了分隔符、坏文件、LOG文件;LOCATION关键字指定了外部表依存的数据文件,如果存在多个文件,使用逗号分隔。

SQL> ed

Wrote file afiedt.buf

CREATE TABLE stenny_ext_product
(product_id NUMBER(4),
product_name VARCHAR2(20),
location VARCHAR2(25)
)
ORGANIZATION EXTERNAL
(
TYPE ORACLE_LOADER
DEFAULT DIRECTORY source_dir
ACCESS PARAMETERS
(
records delimited by newline
badfile log_dir:'bad_product.dat'
logfile log_dir:'product.log'
fields terminated by ','
missing field values are null
( product_id, product_name, location )
)
LOCATION ('product1.dat')
)
 REJECT LIMIT UNLIMITED

SQL> /

Table created

SQL> select * from stennY_ext_product;

PRODUCT_ID PRODUCT_NAME         LOCATION
---------- -------------------- -------------------------
         1 Bicycle              JiangSu
         2 Camps                ZheJiang
         3 Wearings             SiChuan
         4 Gloves               SiChuan
         5 Food                 YunNan
         6 Shoes                NULL

6 rows selected

SQL>

经过上面的步骤,我们将数据库之外的一个平文件通过访问驱动程序ORACLE_LOADER与一个数据库表STENNY_EXT_PRODUCT建立了映射关系。我们对这个外部表可以进行排序,表连接等只读操作。

 
    数据清洗与数据整合
    在典型的数据仓库系统中,事实表中的大部分字段都采用KEY的形式进行存储。在我们的例子中,我们将对LOCATION列进行LOOKUP,满足地区标准的数据被视为正确数据并附以标准键值;如果出现在标准表中不存在的纪录,我们将认为其为脏数据。脏数据将被另行处理。我们的标准表为LOC_STD,正确记录的Staging表为STG_PRODUCT,异常记录的Staging表为STG_EXCEP。在数据文件中,ID为6的产品没有对应的Location,该记录将被视为异常记录。通过一个简单的存储过程,我们将STENNY_EXT_PRODUCT与LOC_STD表进行连接,正确的记录与异常记录将被写入对应的Staging表中。
SQL> --stg_excep
SQL> Create table stg_excep as select * from stenny_ext_product where 1=2;
Table created
SQL> --stg_product
SQL> CREATE TABLE STG_PRODUCT
  2  (
  3  PRODUCT_ID NUMBER,
  4  PRODUCT_NAME VARCHAR2(20),
  5  LOC_ID NUMBER
  6  );
Table created
SQL> --loc_std
SQL> CREATE TABLE LOC_STD
  2  (
  3  LOC_ID NUMBER,
  4  LOC_NAME VARCHAR2(20)
  5  );
Table created
SQL> INSERT INTO LOC_STD ( LOC_ID, LOC_NAME ) VALUES ( 1, 'JiangSu');
1 row inserted
SQL> INSERT INTO LOC_STD ( LOC_ID, LOC_NAME ) VALUES ( 2, 'ZheJiang');
1 row inserted
SQL> INSERT INTO LOC_STD ( LOC_ID, LOC_NAME ) VALUES ( 3, 'SiChuan');
1 row inserted
SQL> INSERT INTO LOC_STD ( LOC_ID, LOC_NAME ) VALUES ( 4, 'YunNan');
1 row inserted
SQL> commit;
Commit complete
SQL> --proc_txn_product
SQL> CREATE OR REPLACE PROCEDURE proc_txn_product AS
  2  BEGIN
  3    insert into stg_product
  4      select product_id, product_name, loc_id
  5        from stenny_ext_product, loc_std
  6       where loc_std.loc_name = stenny_ext_product.location;
  7    insert into stg_excep
  8      select *
  9        from stenny_ext_product
 10       where product_id not in (select product_id from stg_product);
 11    commit;
 12  END proc_txn_product;
 13  /
Procedure created
SQL>
 
执行存储过程可以得到如下的结果:
 
SQL> exec proc_txn_product;
PL/SQL procedure successfully completed
SQL> select * from stg_product;
PRODUCT_ID PRODUCT_NAME             LOC_ID
---------- -------------------- ----------
         1 Bicycle                       1
         2 Camps                         2
         3 Wearings                      3
         4 Gloves                        3
         5 Food                          4
SQL> select * from stg_excep;
PRODUCT_ID PRODUCT_NAME         LOCATION
---------- -------------------- -------------------------
         6 Shoes                NULL
SQL>
 
    数据分发 – ORACLE_DATAPUMP
    在经过了上面的抽取、清洗整合的步骤后,我们将使用ORACLE_DATAPUMP将staging表中的数据写入目标文件中。典型的数据仓库系统最终的目标是数据库,而本文中的ECCD流程可以是数据准备的一个过程,众所周知,数据仓库的刷新频率与Staging区域可能不同,目标文件可以被传输到数据仓库系统,并按照其刷新频率进行第二步加载。顺便提一句,DBMS_SCHEDULER可以用来完成在数据库之间进行文件传输。 

    下面的代码演示了如何使用ORACLE_DATAPUMP驱动访问程序unload数据库表的过程。

SQL> ed

Wrote file afiedt.buf

CREATE TABLE tgt_product
ORGANIZATION EXTERNAL (TYPE ORACLE_DATAPUMP
DEFAULT DIRECTORY target_dir
LOCATION ('tgt_product.dmp'))
PARALLEL 2
AS
SELECT product_id,
product_name,
loc_id
FROM stg_product

SQL> /

Table created

SQL> select * from tgt_product;

PRODUCT_ID PRODUCT_NAME             LOC_ID
---------- -------------------- ----------
         1 Bicycle                       1
         2 Camps                         2
         3 Wearings                      3
         4 Gloves                        3
         5 Food                          4

SQL>

同时,在target_dir目录下可以看到创建的TGT_PRODUCT.DMP文件,该文件可以被加载到真正的目标数据库,但是必须使用ORACLE_DATAPUMP访问驱动程序。

定时调度 – DBMS_SCHEDULER
    首先我们将使用proc_file_watcher存储过程每5分钟对某一个目录进行扫描等待文件。如果文件存在,调用proc_txn_product对文件进行处理,否则睡眠5分钟。这个文件扫描过程在周二早上4点钟被激活,如果超过8点文件还是没有到,可以抛出用户异常或者调用UTL_SMTP包发邮件通知相关责任人。

create or replace procedure proc_file_watcher is
  v_exists      boolean;
  v_file_length number;
  v_blocksize   number;
begin
  <>
  if to_char(sysdate, 'hh24') >= '08' then
    --³¬Ê±£¬¿ÉÒÔµ÷ÓÃUTL_SMTP
    null;
  else
    utl_file.fgetattr('SOURCE_DIR',
                      'product1.dat',
                      v_exists,
                      v_file_length,
                      v_blocksize);
    if v_exists then
      dbms_output.put_line('File there!');
      proc_txn_product;
    else
      dbms_output.put_line('404 Error');
      dbms_lock.sleep(300);
      goto L_sleeping_child;
    end if;
  end if;
end proc_file_watcher;

现在,我们只需要将这个文件扫描程序加入调度,每个周二早上4点钟开始运行即可。
    --创建程序
BEGIN
  DBMS_SCHEDULER.CREATE_PROGRAM(program_name   => 'TEST.STP_PROC_FILE_WATCHER',
                                program_action => 'TEST.PROC_FILE_WATCHER',
                                program_type   => 'STORED_PROCEDURE',
                                comments       => 'Firing the ETL process if file arrives',
                                enabled        => TRUE);
END;

BEGIN
  SYS.DBMS_SCHEDULER.CREATE_SCHEDULE(repeat_interval => 'FREQ=WEEKLY;BYDAY=TUE;BYHOUR=8;BYMINUTE=0;BYSECOND=0',
                                     start_date      => to_timestamp_tz('2011-01-27 US/Central',
                                                                        'YYYY-MM-DD TZR'),
                                     comments        => 'Tuesday AM Schedule',
                                     schedule_name   => '"TEST"."SCS_TXN_PROD"');
END;

BEGIN
  SYS.DBMS_SCHEDULER.CREATE_JOB(job_name      => 'TEST.SCJ_TXN_PROD',
                                program_name  => 'TEST.STP_PROC_FILE_WATCHER',
                                schedule_name => 'TEST.SCS_TXN_PROD',
                                comments      => 'Start the ETL process on Tuesday',
                                auto_drop     => FALSE,
                                enabled       => TRUE);
END;

 

来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/9399028/viewspace-686396/,如需转载,请注明出处,否则将追究法律责任。

上一篇: Oracle Undo的学习
请登录后发表评论 登录
全部评论
鱼儿的学习空间

注册时间:2010-11-16

  • 博文量
    422
  • 访问量
    1755526