ITPub博客

首页 > Linux操作系统 > Linux操作系统 > 用Shell脚本&sqlloader做ETL

用Shell脚本&sqlloader做ETL

原创 Linux操作系统 作者:taogchan 时间:2012-04-25 16:53:37 0 删除 编辑
CREATE OR REPLACE PROCEDURE ETL_ADM.SP_ETL_UNLOAD
/*
    Author: hcw
    Created: 20111018
    Pre-required:
      1. database grant 
      2. srcdb: create table & input data
            etl_tab_list
             ETL_TAB_cols
             etl_exe_date     
             ETL_UNLOAD_CNT
    Purpose:
         1. 两个系统之间卸数、装数的所需脚本自动生成。
            包含:1 各表卸数的SQL: OWNER_TABLENAME.sql                
                  2 卸数shell文件  etlid_srcdbname_unload.sh                
                  3 卸数的计数SQL文件
                  4 truncate ods 临时表的SQL文件
                  5 跑批调用的shell文件
                  6 就绪文件
  
    adjust before run:
         1. 修改源、目的数据库用户密码
         2. 使用共享存储的文件读写权限因不同机器可能出现异常
            为此需要设置CHMOD
  
    如何执行
         执行ODS_SHL_DIR下当日的两个shell文件,
         先在源系统执行UNLOAD,然后在目标系统执行load。
  
    update
     2011-11-07: 增加源、目的TNS
                 增加 chmod 777 文件名
          11-08 :增加关联翻牌表 new_date
          11-14: 增加DAT、LOG、RDY文件的日期标签
                 拆分成unload 与 load都可以增加日期
  */
(p_srcdb          VARCHAR2, --src dbname
 p_desdb          VARCHAR2, --des dbname
 p_etl_id         VARCHAR2, --ETL 1 2 3 4 5,主题来源标识:FOC 飞行,CUST 客户
 p_src_dbuser     VARCHAR2,
 p_src_dbpwd      VARCHAR2,
 p_des_dbuser     VARCHAR2,
 p_des_dbpwd      VARCHAR2,
 p_directory_name VARCHAR2) IS
  CURSOR cur_odsTab IS
    SELECT *
      FROM etl_tab_list a
     WHERE is_used = '1'
       AND a.SRC_TNS = p_srcdb
       AND a.DES_TNS = p_desdb
       AND (A.ETLID = p_etl_id OR A.ETLID IS NULL) -----主题来源标识:FOC 飞行,CUST 客户  BY TAOGCHAN 20120306
    ;

  v_tabrow etl_tab_list%ROWTYPE;
  CURSOR cur_collist IS
    SELECT column_name, data_type, data_length
      FROM etl_tab_cols
     WHERE wner = v_tabrow.src_owner
       AND table_name = v_tabrow.SRC_tab_name
     ORDER BY internal_column_id;

  sqlfHandle UTL_FILE.file_type; --各表卸数的SQL: OWNER_TABLENAME.sql
  unlfHandle UTL_FILE.file_type; --卸数shell文件  etlid_srcdbname_unload.sh
  cntfHandle UTL_FILE.file_type; --卸数的计数SQL文件

  v_filename    VARCHAR2(64);
  v_tmpFileName VARCHAR2(64);
  v_datdir      VARCHAR2(64);
  v_logdir      VARCHAR2(64);
  v_ssldir      VARCHAR2(64);
  v_cfgdir      VARCHAR2(64);
  v_rdydir      VARCHAR2(64);
  v_rootdir     VARCHAR2(64);

  v_sql      VARCHAR2(32767);
  v_colName  VARCHAR2(32);
  v_datatype VARCHAR2(32);
  v_qrystr   VARCHAR2(256);
  v_colLen   INTEGER;
  v_linelen  INTEGER;

  v_directory VARCHAR2(64);
BEGIN
  v_directory := p_directory_name;

  --SELECT max(new_date)  INTO v_bizdate FROM ETL_EXE_DATE;
  --- 1 
  SELECT directory_path
    INTO v_rootdir
    FROM dba_directories
   WHERE directory_name = 'ODS_DIR';
  SELECT directory_path
    INTO v_datdir
    FROM dba_directories
   WHERE directory_name = v_directory || '_DAT';
  SELECT directory_path
    INTO v_logdir
    FROM dba_directories
   WHERE directory_name = v_directory || '_LOG';
  SELECT directory_path
    INTO v_ssldir
    FROM dba_directories
   WHERE directory_name = v_directory || '_SSL';
  SELECT directory_path
    INTO v_cfgdir
    FROM dba_directories
   WHERE directory_name = v_directory || '_CFG';
  SELECT directory_path
    INTO v_rdydir
    FROM dba_directories
   WHERE directory_name = v_directory || '_RDY';

  ---  2 SHELL
  v_tmpFileName := p_etl_id || '_' || p_srcdb || '_unload.sh';
  unlfHandle    := UTL_FILE.FOPEN(v_directory || '_SSL',
                                  v_tmpFileName,
                                  'W',
                                  32767);
  utl_file.put_line(unlfHandle, 'export LANG=zh;');
  utl_file.put_line(unlfHandle, 'export NLS_LANG=''Simplified Chinese_China.zhs16gbk'';');
  utl_file.put_line(unlfHandle, 'cd ' || v_ssldir);

  ---  4 卸数计数SQL文件
  v_tmpFileName := p_etl_id || '_' || p_srcdb || '_cnt.sql';
  cntfHandle    := UTL_FILE.FOPEN(v_directory || '_CFG',
                                  v_tmpFileName,
                                  'W',
                                  32767);
  utl_file.put_line(cntfHandle, 'set echo off;');
  utl_file.put_line(cntfHandle, 'set feedback off;');
  utl_file.put_line(cntfHandle, 'set heading off;');
  utl_file.put_line(cntfHandle, 'set pagesize 0;');
  utl_file.put_line(cntfHandle, 'set termout off');
  utl_file.put_line(cntfHandle, 'set verify off');
  utl_file.put_line(cntfHandle, 'set trimout on;');
  utl_file.put_line(cntfHandle, 'set trimspool on;');
  utl_file.put_line(cntfHandle, 'set linesize 5000;');
  utl_file.put_line(cntfHandle, 'SET TRIMSPOOL ON;');
  utl_file.put_line(cntfHandle, 'set tab off;');
  utl_file.put_line(cntfHandle,
                    'spool ' || v_datdir || '/' || p_etl_id || '_' ||
                    p_srcdb || '_cnt_&1\.txt');

  utl_file.put_line(unlfHandle,
                    'echo begin >' || v_rdydir || '/' || p_srcdb ||
                    '_$1.rdy');
  OPEN cur_odsTab;
  LOOP
    FETCH cur_odsTab
      INTO v_tabrow;
    EXIT WHEN cur_odsTab%NOTFOUND;
    v_filename := v_tabrow.src_owner || '_' || v_tabrow.src_tab_name;
    v_linelen  := 5000;
  
    v_sql    := 'SELECT ';
    v_qrystr := '';
    OPEN cur_collist;
    LOOP
      FETCH cur_collist
        INTO v_colName, v_datatype, v_colLen;
      EXIT WHEN cur_collist%NOTFOUND;
      --- 字段数据(包含了LINUX下的回车换行)异常引起行数不对
      --1源表有data_date,2目标表有data_date
      IF v_tabrow.add_clo_ctrl = '1' AND v_colName = 'DATA_DATE' THEN
        --1源表有,目标表没有,取数的sql去掉data_date字段
        NULL;
      ELSE
        CASE
          WHEN v_datatype IN ('NUMBER', 'FLOAT', 'LONG') THEN
            v_sql := v_sql || 'to_char("' || v_colName || '")||''|@|''||' ||
                     chr(13) || chr(10);
          WHEN v_datatype = 'DATE' OR
               substr(v_datatype, 1, 9) = 'TIMESTAMP' THEN
            v_sql := v_sql || 'to_char("' || v_colName ||
                     '",''yyyy-mm-dd hh24:mi'')||''|@|''||' || chr(13) ||
                     chr(10);
          WHEN v_datatype IN ('NCHAR', 'VARCHAR2', 'NVARCHAR2', 'CHAR') THEN
            --v_sql := v_sql ||'trim("'|| v_colName||'")||'',''||'||chr(13)||chr(10);
            v_sql := v_sql || 'trim(replace(REPLACE("' || v_colName ||
                     '",CHR(10),NULL),CHR(13),NULL))||''|@|''||' || chr(13) ||
                     chr(10);
            --trim(replace(REPLACE(a.ADDRESS,CHR(10),NULL),CHR(13),NULL))
          ELSE
            NULL;
        END CASE;
      END IF;
    
    END LOOP;
    --2目标表有,源表没有,取数的sql增加日期信息到data_date    
    IF v_tabrow.add_clo_ctrl = '2' THEN
      --v_sql := v_sql || ''''||to_char(v_bizdate,'yyyy-mm-dd hh24:mi')||'''||''|@|''||'||chr(13)||chr(10);   
      v_sql := v_sql ||
               'to_char(to_date(''&1'',''yyyymmdd''),''yyyy-mm-dd hh24:mi'')' ||
               '||''|@|''||' || chr(13) || chr(10);
    END IF;
  
    IF length(v_sql) > 11 THEN
      v_sql := substr(v_sql, 1, length(v_sql) - 11) || chr(13) || chr(10);
    
      --begin 修改,增量模式下由col_name字段来限制数据范围
      IF v_tabrow.unload_type = '1' THEN
        --增量导出
        v_sql := v_sql || ' TABTEXT FROM ' || v_tabrow.src_owner || '.' ||
                 v_tabrow.src_tab_name || ' a ' || chr(13) || chr(10);
      
        IF v_tabrow.qry_str IS NOT NULL THEN
          v_qrystr := v_tabrow.qry_str;
        END IF;
      
        IF v_tabrow.col_name IS NOT NULL THEN
          IF v_qrystr IS NOT NULL THEN
            v_qrystr := v_qrystr || ' and ';
          END IF;
          v_qrystr := v_qrystr || ' a.' || v_tabrow.col_name || ' >= ' ||
                      nvl(v_tabrow.val_from, 'a.' || v_tabrow.col_name) ||
                      ' and a.' || v_tabrow.col_name || ' < ' ||
                      nvl(v_tabrow.val_to, 'a.' || v_tabrow.col_name);
        END IF;
      
      ELSE
        v_sql := v_sql || ' TABTEXT FROM ' || v_tabrow.src_owner || '.' ||
                 v_tabrow.src_tab_name || ' a ' || chr(13) || chr(10);
        IF v_tabrow.qry_str IS NOT NULL THEN
          v_qrystr := v_tabrow.qry_str;
        END IF;
      END IF;
      --end 修改,增量模式下由col_name字段来限制数据范围
    
      --begin data_date 字段处理 
      --1源表有data_date,2目标表有data_date
      IF v_tabrow.add_clo_ctrl = '1' THEN
        IF v_qrystr IS NULL THEN
          v_qrystr := ' a.data_date = to_date(''&1'',''yyyymmdd'') ';
        ELSE
          v_qrystr := v_qrystr ||
                      ' and a.data_date = to_date(''&1'',''yyyymmdd'') ';
        END IF;
      ELSIF v_tabrow.add_clo_ctrl = '2' THEN
        NULL;
      END IF;
      --end data_date 字段处理 
    
      IF v_qrystr IS NOT NULL THEN
        v_sql := v_sql || ' where ' || v_qrystr;
      END IF;
    
      v_sql := v_sql || ';';
    
    ELSE
      v_sql := '';
    END IF;
  
    CLOSE cur_collist;
  
    --- 6 卸数DAT的SQL文件
    v_tmpFileName := v_filename || '.sql';
    sqlfHandle    := UTL_FILE.FOPEN(v_directory || '_CFG',
                                    v_tmpFileName,
                                    'W',
                                    32767);
    utl_file.put_line(sqlfHandle, 'set echo off;');
    utl_file.put_line(sqlfHandle, 'set feedback off;');
    utl_file.put_line(sqlfHandle, 'set heading off;');
    utl_file.put_line(sqlfHandle, 'set pagesize 0;');
    utl_file.put_line(sqlfHandle, 'set linesize 5000;');
    utl_file.put_line(sqlfHandle, 'col tabtext format a5000;');
    utl_file.put_line(sqlfHandle, 'SET TRIMSPOOL ON;');
    utl_file.put_line(sqlfHandle, 'set tab off;');
    utl_file.put_line(sqlfHandle, 'set termout off;');
    utl_file.put_line(sqlfHandle, 'set verify off;');
    utl_file.put_line(sqlfHandle, 'set trimout on;');
    utl_file.put_line(sqlfHandle, 'set trimspool on;');
    utl_file.put_line(sqlfHandle,
                      'spool ' || v_datdir || '/' || v_filename ||
                      '_&1\.txt');
    utl_file.put_line(sqlfHandle, v_sql);
    utl_file.put_line(sqlfHandle, 'spool off;');
    utl_file.put_line(sqlfHandle, 'exit;');
    utl_file.fclose(sqlfHandle);
  
    --  写入SHELL文件
    utl_file.put_line(unlfHandle,
                      'sqlplus ' || p_src_dbuser || '/' || p_src_dbpwd || '@' ||
                      p_srcdb || '  @' || v_cfgdir || '/' || v_filename ||
                      '.sql $1');
    --- 9 READY文件
    utl_file.put_line(unlfHandle,
                      'cat ' || v_datdir || '/' || v_filename ||
                      '_$1.txt|wc -l |awk ''{print "' || v_tabrow.src_owner || ',' ||
                      v_tabrow.src_tab_name || '," $1}'' >>' || v_rdydir || '/' ||
                      p_srcdb || '_$1.rdy');
  
    --- 卸数的计数SQL文件
    v_sql := 'SELECT m_date||'',''||cnt_str FROM (
      select ''&1'' m_date,''' || p_etl_id || ',' || p_srcdb || ',' ||
             p_desdb || ',' || v_tabrow.src_owner || ',' ||
             v_tabrow.src_tab_name || ',''||to_char(COUNT(*)) cnt_str
      from ' || v_tabrow.src_owner || '.' ||
             v_tabrow.src_tab_name || ' a  ';
    IF v_qrystr IS NOT NULL THEN
      v_sql := v_sql || ' where ' || v_qrystr;
    END IF;
    v_sql := v_sql || ');';
  
    utl_file.put_line(cntfHandle, v_sql);
  
  END LOOP;
  CLOSE cur_odsTab;

  utl_file.put_line(cntfHandle, 'spool off;');
  utl_file.put_line(cntfHandle, 'exit;');
  utl_file.fclose(cntfHandle);

  -- 卸数的SHELL文件
  utl_file.put_line(unlfHandle,
                    'sqlplus ' || p_src_dbuser || '/' || p_src_dbpwd || '@' ||
                    p_srcdb || '  @' || v_cfgdir || '/' || p_etl_id || '_' ||
                    p_srcdb || '_cnt.sql $1');

  utl_file.put_line(unlfHandle, '');
  utl_file.put_line(unlfHandle, 'echo JOB RUNS SUCCESSFULLY');
  utl_file.fclose(unlfHandle);

END SP_ETL_UNLOAD;
/
-------------------------------------------------------------------------
CREATE OR REPLACE PROCEDURE ETL_ADM.SP_ETL_LOAD
/*
    Author: hcw
    Created: 20111018
    Pre-required:
      1. database grant
      2. srcdb: create table & input data
             etl_tab_list
             ETL_TAB_cols
             ETL_exe_date
      3. desdb:ETL_UNLOAD_CNT
             
  
    Purpose:
         1. 两个系统之间卸数、装数的所需脚本自动生成。
            包含:1 SQLLDR控制文件 OWNER_TABLENAME.ctl
                  2 装数shell文件  etlid_desdbname_load.sh
                  3 装数的计数SQL文件
                  4 truncate ods 临时表的SQL文件
                  5 跑批调用的shell文件
  
    adjust before run:
         1. 修改源、目的数据库用户密码
         2. 使用共享存储的文件读写权限因不同机器可能出现异常
            为此需要设置CHMOD
  
    如何执行
         执行ODS_SHL_DIR下当日的两个shell文件,
         先在源系统执行UNLOAD,然后在目标系统执行load。
  
    update
     2011-11-07: 增加源、目的TNS
                 增加 chmod 777 文件名
          11-08 :增加关联翻牌表 new_date
          11-14: 增加DAT、LOG、RDY文件的日期标签
                 拆分成unload 与 load都可以增加日期
  
  */
(p_srcdb          VARCHAR2, --src dbname
 p_desdb          VARCHAR2, --des dbname
 p_etl_id         VARCHAR2, --主题来源标识:FOC 飞行,CUST 客户
 p_src_dbuser     VARCHAR2,
 p_src_dbpwd      VARCHAR2,
 p_des_dbuser     VARCHAR2,
 p_des_dbpwd      VARCHAR2,
 p_directory_name VARCHAR2) IS
  CURSOR cur_odsTab IS
    SELECT *
      FROM ETL_TAB_LIST A
     WHERE is_used = '1'
       AND a.SRC_TNS = p_srcdb
       AND a.DES_TNS = p_desdb
       AND (A.ETLID = p_etl_id OR A.ETLID IS NULL) -----主题来源标识:FOC 飞行,CUST 客户  BY TAOGCHAN 20120306
    ;

  v_tabrow etl_tab_list%ROWTYPE;
  CURSOR cur_collist IS
    SELECT column_name, data_type, data_length
      FROM etl_tab_cols
     WHERE wner = v_tabrow.src_owner
       AND table_name = v_tabrow.SRC_TAB_NAME
     ORDER BY internal_column_id;

  ctlfHandle  UTL_FILE.file_type; --SQLLDR控制文件 OWNER_TABLENAME.ctl
  ldfHandle   UTL_FILE.file_type; --装数shell文件  etlid_desdbname_load.sh
  ldctfHandle UTL_FILE.file_type; --装数的计数SQL文件
  tsqlfHandle UTL_FILE.file_type; --truncate ods 临时表的SQL文件

  v_filename    VARCHAR2(64);
  v_tmpFileName VARCHAR2(64);
  v_datdir      VARCHAR2(64);
  v_logdir      VARCHAR2(64);
  v_ssldir      VARCHAR2(64);
  v_cfgdir      VARCHAR2(64);
  v_rdydir      VARCHAR2(64);
  v_rootdir     VARCHAR2(64);

  v_sql      VARCHAR2(8192);
  v_collist  VARCHAR2(4096);
  v_colName  VARCHAR2(32);
  v_datatype VARCHAR2(32);
  v_colLen   INTEGER;
  v_linelen  INTEGER;

  --v_bizDate  DATE;
  v_cnt       INTEGER;
  v_directory VARCHAR2(64);
  v_sh        VARCHAR2(8192);

  --v_new_date VARCHAR2(32);
BEGIN

  --SELECT max(new_date)  INTO v_bizdate FROM ETL_EXE_DATE;
  v_directory := p_directory_name;
  --- 1 获取数据库路径信息
  SELECT directory_path
    INTO v_rootdir
    FROM dba_directories
   WHERE directory_name = 'ODS_DIR';
  SELECT directory_path
    INTO v_datdir
    FROM dba_directories
   WHERE directory_name = v_directory || '_DAT';
  SELECT directory_path
    INTO v_logdir
    FROM dba_directories
   WHERE directory_name = v_directory || '_LOG';
  SELECT directory_path
    INTO v_ssldir
    FROM dba_directories
   WHERE directory_name = v_directory || '_SSL';
  SELECT directory_path
    INTO v_cfgdir
    FROM dba_directories
   WHERE directory_name = v_directory || '_CFG';
  SELECT directory_path
    INTO v_rdydir
    FROM dba_directories
   WHERE directory_name = v_directory || '_RDY';

  ---  3 装数SHELL文件
  v_tmpFileName := p_etl_id || '_' || p_desdb || '_load.sh';
  ldfHandle     := UTL_FILE.FOPEN(v_directory || '_SSL',
                                  v_tmpFileName,
                                  'W',
                                  32767);
  utl_file.put_line(ldfHandle, 'export LANG=zh;');
  utl_file.put_line(ldfHandle, 'export NLS_LANG=''Simplified Chinese_China.zhs16gbk'';');
  utl_file.put_line(ldfHandle, 'cd ' || v_ssldir);
  ---  3.1 truncate ods 临时表的SQL文件
  SELECT COUNT(*)
    INTO v_cnt
    FROM etl_tab_list a
   WHERE is_used = '1'
     AND a.SRC_TNS = p_srcdb
     AND a.DES_TNS = p_desdb
     AND a.NEED_TRUNC = '1';
  IF v_cnt > 0 THEN
    v_tmpFileName := p_etl_id || '_' || p_desdb || '_trunc.sql';
    tsqlfHandle   := UTL_FILE.FOPEN(v_directory || '_CFG',
                                    v_tmpFileName,
                                    'W',
                                    32767);
    utl_file.put_line(ldfHandle,
                      'sqlplus ' || p_des_dbuser || '/' || p_des_dbpwd || '@' ||
                      p_desdb || ' @' || v_cfgdir || '/' || p_etl_id || '_' ||
                      p_desdb || '_trunc.sql $1');
  END IF;

  ---  5 装数计数SQL文件
  v_tmpFileName := p_etl_id || '_' || p_desdb || '_cnt.sql';
  ldctfHandle   := UTL_FILE.FOPEN(v_directory || '_CFG',
                                  v_tmpFileName,
                                  'W',
                                  32767);

  --清除同批次的执行量统计
  v_sql := 'delete from etl_unload_cnt where src_db=''' || p_srcdb ||
           ''' and des_db=''' || p_desdb ||
           ''' and unload_date = to_date(''&1'',''yyyy-mm-dd'');';
  utl_file.put_line(ldctfHandle, v_sql);
  utl_file.put_line(ldctfHandle, 'commit;');
  utl_file.put_line(ldctfHandle, 'exit;');

  OPEN cur_odsTab;
  LOOP
    FETCH cur_odsTab
      INTO v_tabrow;
    EXIT WHEN cur_odsTab%NOTFOUND;
    v_filename := v_tabrow.src_owner || '_' || v_tabrow.src_tab_name;
  
    v_linelen := 5000;
  
    v_collist := '';
    OPEN cur_collist;
    LOOP
      FETCH cur_collist
        INTO v_colName, v_datatype, v_colLen;
      EXIT WHEN cur_collist%NOTFOUND;
      --1源表有data_date,2目标表有data_date
      IF v_tabrow.add_clo_ctrl = '1' AND v_colName = 'DATA_DATE' THEN
        --1源表有data_date,目标表没有,加载数时去掉data_date字段
        NULL;
      ELSE
        IF v_datatype = 'CHAR' THEN
          v_collist := v_collist || '"' || v_colName || '"    CHAR(' ||
                       to_char(v_collen) || '),';
        ELSIF v_datatype = 'DATE' OR substr(v_datatype, 1, 9) = 'TIMESTAMP' THEN
          v_collist := v_collist || '"' || v_colName ||
                       '"    DATE ''yyyy-mm-dd hh24:mi'',';
        ELSE
          v_collist := v_collist || '"' || v_colName || '",';
        END IF;
      END IF;
    END LOOP;
    --2 目标表有data_date字段,源表没有data_date字段,取数的sql增加日期信息到data_date    
    IF v_tabrow.add_clo_ctrl = '2' THEN
      v_collist := v_collist ||
                   '"DATA_DATE"    DATE ''yyyy-mm-dd hh24:mi'',';
    END IF;
  
    v_collist := substr(v_collist, 1, length(v_collist) - 1) ||
                 ' TERMINATED BY WHITESPACE ';
  
    CLOSE cur_collist;
    --- 7 装数sqlldr控制文件
    v_tmpFileName := v_filename || '.ctl';
    ctlfHandle    := UTL_FILE.FOPEN(v_directory || '_CFG',
                                    v_tmpFileName,
                                    'W',
                                    32767);
    utl_file.put_line(ctlfHandle, 'load data');
    utl_file.put_line(ctlfHandle, 'characterset zhs16gbk');
    utl_file.put_line(ctlfHandle,
                      'INTO TABLE ' || v_tabrow.DES_OWNER || '.' ||
                      v_tabrow.DES_TAB_NAME);
    utl_file.put_line(ctlfHandle, 'APPEND');
    utl_file.put_line(ctlfHandle, 'FIELDS TERMINATED BY ''|@|''');
    utl_file.put_line(ctlfHandle, 'trailing nullcols');
    utl_file.put_line(ctlfHandle, '(');
    utl_file.put_line(ctlfHandle, v_collist);
    utl_file.put_line(ctlfHandle, ')');
    utl_file.fclose(ctlfHandle);
    utl_file.put_line(ldfHandle,
                      'sqlldr ' || p_des_dbuser || '/' || p_des_dbpwd || '@' ||
                      p_desdb || ' data=' || v_datdir || '/' || v_filename ||
                      '_$1.txt bad=' || v_logdir || '/' || v_filename ||
                      '_$1.bad control=' || v_cfgdir || '/' || v_filename ||
                      '.ctl log=' || v_logdir || '/' || v_filename ||
                      '_$1.log silent=header,feedback,errors,discards,partitions rows=1000');
  
    --- 装数的计数SQL文件
    IF v_tabrow.NEED_TRUNC = '1' THEN
      IF v_tabrow.add_clo_ctrl = '2' THEN
        --2目标表使用data_date字段时,删除目标表当天和14天之前的数据                   
        v_sql := 'delete from ' || v_tabrow.DES_OWNER || '.' ||
                 v_tabrow.DES_TAB_NAME ||
                 ' where data_date = to_date(''&1'',''yyyymmdd'')' ||
                 ' or data_date < to_date(''&1'',''yyyymmdd'')-14 ' || ';';
      ELSE
        v_sql := 'truncate table ' || v_tabrow.DES_OWNER || '.' ||
                 v_tabrow.DES_TAB_NAME || ';';
      END IF;
    
      utl_file.put_line(tsqlfHandle, v_sql);
    END IF;
  
    --
    v_sh := v_sh || 'sh ' || v_rootdir || '/load_cnt.sh $1 ' || p_srcdb || ' ' ||
            p_desdb || ' ' || v_tabrow.src_tab_name || ' ' ||
            v_tabrow.src_owner || chr(10);
  
  END LOOP;
  CLOSE cur_odsTab;
  IF v_cnt > 0 THEN
    utl_file.put_line(tsqlfHandle, 'exit;');
    utl_file.fclose(tsqlfHandle);
  END IF;

  utl_file.fclose(ldctfHandle);

  -- 计数的控制文件
  v_tmpFileName := p_etl_id || '_' || p_srcdb || '_cnt.ctl';
  ctlfHandle    := UTL_FILE.FOPEN(v_directory || '_CFG',
                                  v_tmpFileName,
                                  'W',
                                  32767);
  utl_file.put_line(ctlfHandle, 'load data');
  utl_file.put_line(ctlfHandle, 'characterset zhs16gbk');

  utl_file.put_line(ctlfHandle, 'INTO TABLE ETL_UNLOAD_CNT');
  utl_file.put_line(ctlfHandle, 'APPEND');
  utl_file.put_line(ctlfHandle, 'FIELDS TERMINATED BY '',''');
  utl_file.put_line(ctlfHandle, 'trailing nullcols');
  utl_file.put_line(ctlfHandle,
                    '(unload_date DATE ''yyyy-mm-dd'',ETL_ID,src_db,des_db,OWNER,TABLE_NAME,src_row_cnt  TERMINATED BY WHITESPACE)');
  utl_file.fclose(ctlfHandle);

  utl_file.put_line(ldfHandle, 'fn=' || v_rootdir || '/etltns.properties');
  utl_file.put_line(ldfHandle,
                    'username=`grep "username" $fn|cut -d = -f 2`');
  utl_file.put_line(ldfHandle,
                    'password=`grep "password" $fn|cut -d = -f 2`');
  utl_file.put_line(ldfHandle,
                    'tnsname=`grep "tnsname" $fn|cut -d = -f 2`');

  utl_file.put_line(ldfHandle,
                    'sqlplus $username/$password@$tnsname @' || v_cfgdir || '/' ||
                    p_etl_id || '_' || p_desdb || '_cnt.sql $1');
  utl_file.put_line(ldfHandle,
                    '"sqlldr" $username/$password@$tnsname data=' ||
                    v_datdir || '/' || p_etl_id || '_' || p_srcdb ||
                    '_cnt_$1.txt bad=' || v_logdir || '/' || p_etl_id || '_' ||
                    p_srcdb || '_cnt_$1.bad control=' || v_cfgdir || '/' ||
                    p_etl_id || '_' || p_srcdb || '_cnt.ctl log=' ||
                    v_logdir || '/' || p_etl_id || '_' || p_srcdb ||
                    '_$1.log silent=header,feedback,errors,discards,partitions  rows=10');

  utl_file.put_line(ldfHandle, v_sh);
  utl_file.put_line(ldfHandle, 'echo JOB RUNS SUCCESSFULLY');
  utl_file.fclose(ldfHandle);

  /*EXCEPTION
  WHEN OTHERS THEN
    dbms_output.put_line(SQLERRM);*/
END SP_ETL_LOAD;
/
CREATE TABLE ETL_TAB_LIST
(
  SRC_TNS       VARCHAR2(32 BYTE),
  DES_TNS       VARCHAR2(32 BYTE),
  SRC_OWNER     VARCHAR2(32 BYTE),
  SRC_TAB_NAME  VARCHAR2(32 BYTE),
  DES_OWNER     VARCHAR2(32 BYTE),
  DES_TAB_NAME  VARCHAR2(32 BYTE),
  UNLOAD_TYPE   CHAR(1 BYTE),
  IS_USED       CHAR(1 BYTE),
  COL_NAME      VARCHAR2(32 BYTE),
  COL_TYPE      VARCHAR2(32 BYTE),
  VAL_FROM      VARCHAR2(64 BYTE),
  VAL_TO        VARCHAR2(64 BYTE),
  QRY_STR       VARCHAR2(256 BYTE),
  NEED_TRUNC    CHAR(1 BYTE)                    DEFAULT 1,
  ADD_CLO_CTRL  CHAR(1 BYTE),
  ETLID         VARCHAR2(10 BYTE)
)
TABLESPACE MDMDATA
PCTUSED    0
PCTFREE    10
INITRANS   1
MAXTRANS   255
STORAGE    (
            INITIAL          64K
            NEXT             8K
            MINEXTENTS       1
            MAXEXTENTS       UNLIMITED
            PCTINCREASE      0
            BUFFER_POOL      DEFAULT
           )
LOGGING 
NOCOMPRESS 
NOCACHE
NOPARALLEL
MONITORING;

COMMENT ON TABLE ETL_TAB_LIST IS 'ETL导出表清单';

COMMENT ON COLUMN ETL_TAB_LIST.ETLID IS '主题来源标识:FOC 飞行,CUST 客户';

COMMENT ON COLUMN ETL_TAB_LIST.SRC_TNS IS '源数据库TNS';

COMMENT ON COLUMN ETL_TAB_LIST.DES_TNS IS '目标数据库TNS';

COMMENT ON COLUMN ETL_TAB_LIST.SRC_OWNER IS '源数据库用户';

COMMENT ON COLUMN ETL_TAB_LIST.SRC_TAB_NAME IS '源表';

COMMENT ON COLUMN ETL_TAB_LIST.DES_OWNER IS '目标数据库用户';

COMMENT ON COLUMN ETL_TAB_LIST.DES_TAB_NAME IS '目标表';

COMMENT ON COLUMN ETL_TAB_LIST.UNLOAD_TYPE IS '加载类型,0全量1增量';

COMMENT ON COLUMN ETL_TAB_LIST.IS_USED IS '是否使用';

COMMENT ON COLUMN ETL_TAB_LIST.COL_NAME IS '增量列名';

COMMENT ON COLUMN ETL_TAB_LIST.COL_TYPE IS '增量列类型';

COMMENT ON COLUMN ETL_TAB_LIST.VAL_FROM IS '增量列起始值';

COMMENT ON COLUMN ETL_TAB_LIST.VAL_TO IS '增量列结束值';

COMMENT ON COLUMN ETL_TAB_LIST.QRY_STR IS '固定的查询条件';

COMMENT ON COLUMN ETL_TAB_LIST.NEED_TRUNC IS '目标表加载前是否清空';

COMMENT ON COLUMN ETL_TAB_LIST.ADD_CLO_CTRL IS '额外字段控制,用于判断data_date的使用方式,1源表有data_date,2目标表有data_date';

----------------------------------------
CREATE TABLE ETL_TAB_COLS
(
  OWNER               VARCHAR2(30 BYTE)         NOT NULL,
  TABLE_NAME          VARCHAR2(30 BYTE)         NOT NULL,
  COLUMN_NAME         VARCHAR2(30 BYTE)         NOT NULL,
  DATA_TYPE           VARCHAR2(106 BYTE),
  DATA_LENGTH         NUMBER                    NOT NULL,
  INTERNAL_COLUMN_ID  NUMBER                    NOT NULL
)
TABLESPACE MDMDATA
PCTUSED    0
PCTFREE    10
INITRANS   1
MAXTRANS   255
STORAGE    (
            INITIAL          64K
            NEXT             1M
            MINEXTENTS       1
            MAXEXTENTS       UNLIMITED
            PCTINCREASE      0
            BUFFER_POOL      DEFAULT
           )
LOGGING 
NOCOMPRESS 
NOCACHE
NOPARALLEL
MONITORING;



来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/22392018/viewspace-722235/,如需转载,请注明出处,否则将追究法律责任。

下一篇: Ext未定义问题
请登录后发表评论 登录
全部评论

注册时间:2009-08-28

  • 博文量
    200
  • 访问量
    1183814