ITPub博客

首页 > 数据库 > 国内数据库 > 手把手构建基于 GBase8s 的 Flink connector

手把手构建基于 GBase8s 的 Flink connector

国内数据库 作者:wj_2021 时间:2021-07-26 10:12:33 1 删除 编辑

 

 

 

 

简介

本篇文章,首先会向大家阐述什么是 Flink connector 和 CDC , 然后会通过手把手的方式和大家一起构建一个简单的GBase8s的Flink connector,并完成实践项目,即通过Mysql CDC实时通过connector同步数据到GBase8s中。

 

什么是 Flink connector

Flink内置了一些基本数据源和接收器,这些数据源和接收器始终可用。该预定义的数据源包括文件、Mysql、RabbitMq、Kafka、ES等,同时也支持数据输出到文件、Mysql、RabbitMq、Kafka、ES等。

简单的说:flink连接器就是将某些数据源加载与数据输出做了封装(连接器),我们只要引入对应的连接器依赖,即可快速的完成对数据源的加载以及数据的输出。

 

什么是CDC Change Data Capture

首先什么是CDC ? 它是Change Data Capture的缩写,即变更数据捕捉的简称,使用CDC我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。这些变更可以包括INSERT,DELETE,UPDATE等操作。

 

 

其主要的应用场景:

•  异构数据库之间的数据同步或备份 / 建立数据分析计算平台

•  微服务之间共享数据状态

•  更新缓存 / CQRS 的 Query 视图更新

CDC 它是一个比较广义的概念,只要能捕获变更的数据,我们都可以称为 CDC 。业界主要有基于查询的 CDC 和基于日志的 CDC ,可以从下面表格对比他们功能和差异点。


我们其实是可以自己手写 Sink将CDC的数据直接汇入我们的目标数据库的。这样是不是不够优雅?我们是不是可以通过Flink SQL的方式将数据汇入到GBase8s呢? 答案是肯定的,接下来我们就来实现一个简单的GBase8s的Flink connector
flink-connector-gbasedbt

 

构建 行转换器(RowConverter)

构建 方言(Dialect)

注册动态表工厂(DynamicTableFactory),以及相关Sink程序

 

经过上面三步,就可以实现一个简单的connector了。接下来我们就来看,如何实现:

 

构建 行转换器(RowConverter

package  wang.datahub.converter;

import  org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter;
import  org.apache.flink.table.types.logical.RowType;

/**
  * @author lijiaqi
  */
public   class  GBasedbtRowConverter extends  AbstractJdbcRowConverter {

     public   GBasedbtRowConverter (RowType rowType) {
         super (rowType);
    }

     private   static   final   long  serialVersionUID = 1L ;

     @Override
     public   String   converterName () {
         return   "gbasedbt" ;
    }

}

 

构建 方言(Dialect

package  wang.datahub.dialect;

import  org.apache.flink.connector.jdbc.dialect.JdbcDialect;
import  org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
import  org.apache.flink.table.api.TableSchema;
import  org.apache.flink.table.api.ValidationException;
import  org.apache.flink.table.types.logical.RowType;
import  wang.datahub.converter.GBasedbtRowConverter;

import  java.util.Optional;

/**
  *
  * @author lijiaqi
  */
public   class  GBasedbtDialect implements  JdbcDialect {

     private   static   final   long  serialVersionUID = 1L ;

     @Override
     public   String   dialectName () {
         return   "gbasedbt" ;
    }

     @Override
     public   boolean   canHandle ( String  url) {
         return  url. startsWith ( "jdbc:gbasedbt-sqli:" );
    }

     @Override
     public  JdbcRowConverter getRowConverter (RowType rowType) {
         return   new   GBasedbtRowConverter (rowType);
    }

     @Override
     public   String   getLimitClause ( long  l) {
         return   null ;
    }

     @Override
     public   void   validate (TableSchema schema) throws   ValidationException  {
        JdbcDialect. super . validate (schema);
    }

     @Override
     public  Optional< String > defaultDriverName () {
         return  Optional. of ( "com.gbasedbt.jdbc.Driver" );
    }

     @Override
     public   String   quoteIdentifier ( String  identifier) {
         return   "'"  + identifier + "'" ;
    }

     @Override
     public  Optional< String > getUpsertStatement ( String  tableName, String [] fieldNames, String [] uniqueKeyFields) {
         return  JdbcDialect. super . getUpsertStatement (tableName, fieldNames, uniqueKeyFields);
    }

     @Override
     public   String   getRowExistsStatement ( String  tableName, String [] conditionFields) {
         return  JdbcDialect. super . getRowExistsStatement (tableName, conditionFields);
    }

     @Override
     public   String   getInsertIntoStatement ( String  tableName, String [] fieldNames) {
         return  JdbcDialect. super . getInsertIntoStatement (tableName, fieldNames);
    }

     @Override
     public   String   getUpdateStatement ( String  tableName, String [] fieldNames, String [] conditionFields) {
         return  JdbcDialect. super . getUpdateStatement (tableName, fieldNames, conditionFields);
    }

     @Override
     public   String   getDeleteStatement ( String  tableName, String [] conditionFields) {
         return  JdbcDialect. super . getDeleteStatement (tableName, conditionFields);
    }

     @Override
     public   String   getSelectFromStatement ( String  tableName, String [] selectFields, String [] conditionFields) {
         return  JdbcDialect. super . getSelectFromStatement (tableName, selectFields, conditionFields);
    }

}

 

 

注册动态表工厂(DynamicTableFactory ),以及相关 Sink 程序

首先创建 GBasedbtSinkFunction   用于接受RowData数据输入,并将其Sink到配置的数据库中

package  wang.datahub.table;

import  org.apache.flink.api.common.serialization.SerializationSchema;
import  org.apache.flink.configuration.Configuration;
import  org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import  org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import  org.apache.flink.table.data.RowData;
import  org.apache.flink.table.types.DataType;

import  java.sql.Connection;
import  java.sql.DriverManager;
import  java.sql.Statement;

/**
  * @author lijiaqi
  */
public   class  GBasedbtSinkFunction extends  RichSinkFunction<RowData> {

     private   static   final   long  serialVersionUID = 1L ;

     private   final  JdbcOptions jdbcOptions;
     private   final  SerializationSchema<RowData> serializationSchema = null ;
     private  DataType dateType;

     private   Connection  conn;
     private   Statement  stmt;

     public   GBasedbtSinkFunction (JdbcOptions jdbcOptions) {
         this . jdbcOptions  = jdbcOptions;
    }

     public   GBasedbtSinkFunction (JdbcOptions jdbcOptions, DataType dataType) {
         this . jdbcOptions  = jdbcOptions;
         this . dateType  = dataType;
    }

     @Override
     public   void   open ( Configuration  parameters) {
         System . out . println ( "open connection !!!!!" );
         try  {
             if  ( null  == conn) {
                 Class . forName (jdbcOptions. getDriverName ());
                conn = DriverManager . getConnection (jdbcOptions. getDbURL (),jdbcOptions. getUsername (). orElse ( null ),jdbcOptions. getPassword (). orElse ( null ));
            }
        } catch  ( Exception  e) {
            e. printStackTrace ();
        }
    }

     @Override
     public   void   invoke (RowData value, Context  context) throws   Exception  {

         try  {
            stmt = conn. createStatement ();
             String  sql = "insert into "  + this . jdbcOptions . getTableName () + " values ( " ;
             for  ( int  i = 0 ; i < value. getArity (); i++) {
                 // 这里需要根据事情类型进行匹配
                 if (dateType. getChildren (). get (i). getConversionClass (). equals ( Integer . class )){
                    sql += +value. getInt (i)+ " ," ;
                } else  {
                    sql += "'" +value. getString (i) + "' ," ;
                }
            }
            sql = sql. substring ( 0 , sql. length () - 1 );
            sql += " ); " ;

             System . out . println ( "sql ==>"  + sql);

            stmt. execute (sql);
        } catch ( Exception  e){
            e. printStackTrace ();
        }
    }

     @Override
     public   void   close () throws   Exception  {
         if  (stmt != null ) {
            stmt. close ();
        }
         if  (conn != null ) {
            conn. close ();
        }
    }

}

 

构建 GBasedbtDynamicTableSink  

package  wang.datahub.table;

import  org.apache.flink.api.common.serialization.SerializationSchema;
import  org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import  org.apache.flink.table.connector.ChangelogMode;
import  org.apache.flink.table.connector.format.EncodingFormat;
import  org.apache.flink.table.connector.sink.DynamicTableSink;
import  org.apache.flink.table.connector.sink.SinkFunctionProvider;
import  org.apache.flink.table.data.RowData;
import  org.apache.flink.table.types.DataType;

/**
  * @author lijiaqi
  */
public   class  GBasedbtDynamicTableSink implements  DynamicTableSink {

     private   final  JdbcOptions jdbcOptions;
     private   final  EncodingFormat<SerializationSchema<RowData>> encodingFormat;
     private   final  DataType dataType;

     public   GBasedbtDynamicTableSink (JdbcOptions jdbcOptions, EncodingFormat<SerializationSchema<RowData>> encodingFormat, DataType dataType) {
         this . jdbcOptions  = jdbcOptions;
         this . encodingFormat  = encodingFormat;
         this . dataType  = dataType;
    }

     @Override
     public  ChangelogMode getChangelogMode (ChangelogMode requestedMode) {
         return  requestedMode;
    }

     @Override
     public  SinkRuntimeProvider getSinkRuntimeProvider ( Context  context) {
         System . out . println ( "SinkRuntimeProvider" );
         System . out . println (dataType);
        GBasedbtSinkFunction gbasedbtSinkFunction = new   GBasedbtSinkFunction (jdbcOptions,dataType);
         return  SinkFunctionProvider. of (gbasedbtSinkFunction);
    }

     @Override
     public  DynamicTableSink copy () {
         return   new   GBasedbtDynamicTableSink (jdbcOptions, encodingFormat, dataType);
    }

     @Override
     public   String   asSummaryString () {
         return   "gbasedbt Table Sink" ;
    }

}

 

构建 GBasedbtDynamicTableFactory

package  wang.datahub.table;


import  org.apache.flink.configuration.ConfigOption;
import  org.apache.flink.configuration.ConfigOptions;
import  org.apache.flink.configuration.ReadableConfig;
import  org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import  org.apache.flink.table.api.TableSchema;
import  org.apache.flink.table.connector.sink.DynamicTableSink;
import  org.apache.flink.table.connector.source.DynamicTableSource;
import  org.apache.flink.table.factories.DynamicTableSinkFactory;
import  org.apache.flink.table.factories.DynamicTableSourceFactory;
import  org.apache.flink.table.factories.FactoryUtil;
import  org.apache.flink.table.types.DataType;
import  org.apache.flink.table.utils.TableSchemaUtils;
import  wang.datahub.dialect.GBasedbtDialect;

import  java.util.HashSet;
import  java.util.Set;

/**
  * @author lijiaqi
  */
public   class  GBasedbtDynamicTableFactory implements  DynamicTableSourceFactory, DynamicTableSinkFactory {

     public   static   final   String  IDENTIFIER = "gbasedbt" ;

     private   static   final   String  DRIVER_NAME = "com.gbasedbt.jdbc.Driver" ;

     public   static   final  ConfigOption< String > URL  = ConfigOptions
            . key ( "url" )
            . stringType ()
            . noDefaultValue ()
            . withDescription ( "the jdbc database url." );

     public   static   final  ConfigOption< String > DRIVER = ConfigOptions
            . key ( "driver" )
            . stringType ()
            . defaultValue (DRIVER_NAME)
            . withDescription ( "the jdbc driver." );

     public   static   final  ConfigOption< String > TABLE_NAME = ConfigOptions
            . key ( "table-name" )
            . stringType ()
            . noDefaultValue ()
            . withDescription ( "the jdbc table name." );

     public   static   final  ConfigOption< String > USERNAME = ConfigOptions
            . key ( "username" )
            . stringType ()
            . noDefaultValue ()
            . withDescription ( "the jdbc user name." );

     public   static   final  ConfigOption< String > PASSWORD = ConfigOptions
            . key ( "password" )
            . stringType ()
            . noDefaultValue ()
            . withDescription ( "the jdbc password." );

//    public static final ConfigOption<String> FORMAT = ConfigOptions
//            .key("format")
//            .stringType()
//            .noDefaultValue()
//            .withDescription("the format.");

     @Override
     public   String   factoryIdentifier () {
         return  IDENTIFIER;
    }

     @Override
     public   Set <ConfigOption<?>> requiredOptions () {
         Set <ConfigOption<?>> requiredOptions = new   HashSet <>();
        requiredOptions. add ( URL );
        requiredOptions. add (TABLE_NAME);
        requiredOptions. add (USERNAME);
        requiredOptions. add (PASSWORD);
//        requiredOptions.add(FORMAT);
         return  requiredOptions;
    }

     @Override
     public   Set <ConfigOption<?>> optionalOptions () {
         return   new   HashSet <>();
    }

     @Override
     public  DynamicTableSource createDynamicTableSource ( Context  context) {
        
         final  FactoryUtil. TableFactoryHelper  helper = FactoryUtil. createTableFactoryHelper ( this , context);

         final  ReadableConfig config = helper. getOptions ();
        
        helper. validate ();

        JdbcOptions jdbcOptions = getJdbcOptions (config);
        
        TableSchema physicalSchema = TableSchemaUtils. getPhysicalSchema (context. getCatalogTable (). getSchema ());

         return   new   GBasedbtDynamicTableSource (jdbcOptions, physicalSchema);

    }

     @Override
     public  DynamicTableSink createDynamicTableSink ( Context  context) {
        
         final  FactoryUtil. TableFactoryHelper  helper = FactoryUtil. createTableFactoryHelper ( this , context);

//        final EncodingFormat<SerializationSchema<RowData>> encodingFormat = helper.discoverEncodingFormat(
//                SerializationFormatFactory.class,
//                FactoryUtil.FORMAT);

         final  ReadableConfig config = helper. getOptions ();

        helper. validate ();

        JdbcOptions jdbcOptions = getJdbcOptions (config);

         final  DataType dataType = context. getCatalogTable (). getSchema (). toPhysicalRowDataType ();

         return   new   GBasedbtDynamicTableSink (jdbcOptions, null , dataType);
    }

     private  JdbcOptions getJdbcOptions (ReadableConfig readableConfig) {
         final   String  url = readableConfig. get ( URL );
         final  JdbcOptions. Builder  builder = JdbcOptions. builder ()
                . setDriverName (DRIVER_NAME)
                . setDBUrl (url)
                . setTableName (readableConfig. get (TABLE_NAME))
                . setDialect ( new   GBasedbtDialect ());

        readableConfig. getOptional (USERNAME). ifPresent (builder::setUsername);
        readableConfig. getOptional (PASSWORD). ifPresent (builder::setPassword);
         return  builder. build ();
    }

}

 

接下来通过SPI注册动态表:创建文件 resources\META-INF\services\org.apache.flink.table.factories.Factory 内容注册为 wang.datahub.table.GBasedbtDynamicTableFactory

 

致此,我们的Flink connector 就构建完成,接下来,我们要使用其,来完成一个真正的项目。

 

实战项目

下面是项目的整体架构图,我们通过flink cdc 从mysql获取变更数据,然后通过 flink sql 将数据 sink 到 gbase8s 里

 

 

接下来,我们看一下如何通过Flink SQL实现CDC ,只需3条SQL语句即可。

创建数据源表

         // 数据源表
         String  sourceDDL =
                 "CREATE TABLE mysql_binlog ( \n "  +
                         " id INT NOT NULL, \n "  +
                         " name STRING, \n "  +
                         " description STRING \n "  +
                         ") WITH ( \n "  +
                         " 'connector' = 'mysql-cdc', \n "  +
                         " 'hostname' = 'localhost', \n "  +
                         " 'port' = '3306', \n "  +
                         " 'username' = 'flinkcdc', \n "  +
                         " 'password' = '123456', \n "  +
                         " 'database-name' = 'test', \n "  +
                         " 'table-name' = 'test_cdc' \n "  +
                         ")" ;

创建输出表,输出到GBase8s ,这里 connector设置成gbasedbt

         String  url = "jdbc:gbasedbt-sqli://172.31.95.133:9088/t1:GBASEDBTSERVER=ol_gbasedbt1210_1;NEWCODESET=UTF8,zh_cn.UTF8,57372;DATABASE=sysmaster;DB_LOCALE=en_US.819;" ;
         String  userName = "gbasedbt" ;
         String  password = "123456" ;
         String  gbasedbtSinkTable = "ta" ;
         // 输出目标表
         String  sinkDDL =
                 "CREATE TABLE test_cdc_sink ( \n "  +
                         " id INT NOT NULL, \n "  +
                         " name STRING, \n "  +
                         " description STRING, \n "  +
                         " PRIMARY KEY (id) NOT ENFORCED \n  "  +
                         ") WITH ( \n "  +
                         " 'connector' = 'gbasedbt', \n "  +
//                        " 'driver' = 'com.gbasedbt.jdbc.Driver',\n" +
                         " 'url' = '"  + url + "', \n "  +
                         " 'username' = '"  + userName + "', \n "  +
                         " 'password' = '"  + password + "', \n "  +
                         " 'table-name' = '"  + gbasedbtSinkTable + "' \n "  +
                         ")" ;

这里我们直接将数据汇入

         String  transformSQL =
                 "insert into test_cdc_sink select * from mysql_binlog" ;

 

完整参考代码

package  wang.datahub.cdc;

import  org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import  org.apache.flink.table.api.EnvironmentSettings;
import  org.apache.flink.table.api.SqlDialect;
import  org.apache.flink.table.api.TableResult;
import  org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public   class  MysqlToGBasedbtlMain {
     public   static   void   main ( String [] args) throws   Exception  {
        EnvironmentSettings fsSettings = EnvironmentSettings. newInstance ()
                . useBlinkPlanner ()
                . inStreamingMode ()
                . build ();
        StreamExecutionEnvironment env = StreamExecutionEnvironment. getExecutionEnvironment ();
        env. setParallelism ( 1 );
        StreamTableEnvironment tableEnv = StreamTableEnvironment. create (env, fsSettings);



        tableEnv. getConfig (). setSqlDialect (SqlDialect. DEFAULT );


         // 数据源表
         String  sourceDDL =
                 "CREATE TABLE mysql_binlog ( \n "  +
                         " id INT NOT NULL, \n "  +
                         " name STRING, \n "  +
                         " description STRING \n "  +
                         ") WITH ( \n "  +
                         " 'connector' = 'mysql-cdc', \n "  +
                         " 'hostname' = 'localhost', \n "  +
                         " 'port' = '3306', \n "  +
                         " 'username' = 'flinkcdc', \n "  +
                         " 'password' = '123456', \n "  +
                         " 'database-name' = 'test', \n "  +
                         " 'table-name' = 'test_cdc' \n "  +
                         ")" ;


         String  url = "jdbc:gbasedbt-sqli://172.31.95.133:9088/t1:GBASEDBTSERVER=ol_gbasedbt1210_1;NEWCODESET=UTF8,zh_cn.UTF8,57372;DATABASE=sysmaster;DB_LOCALE=en_US.819;" ;
         String  userName = "gbasedbt" ;
         String  password = "123456" ;
         String  gbasedbtSinkTable = "ta" ;
         // 输出目标表
         String  sinkDDL =
                 "CREATE TABLE test_cdc_sink ( \n "  +
                         " id INT NOT NULL, \n "  +
                         " name STRING, \n "  +
                         " description STRING, \n "  +
                         " PRIMARY KEY (id) NOT ENFORCED \n  "  +
                         ") WITH ( \n "  +
                         " 'connector' = 'gbasedbt', \n "  +
//                        " 'driver' = 'com.gbasedbt.jdbc.Driver',\n" +
                         " 'url' = '"  + url + "', \n "  +
                         " 'username' = '"  + userName + "', \n "  +
                         " 'password' = '"  + password + "', \n "  +
                         " 'table-name' = '"  + gbasedbtSinkTable + "' \n "  +
                         ")" ;

         String  transformSQL =
                 "insert into test_cdc_sink select * from mysql_binlog" ;

        tableEnv. executeSql (sourceDDL);
        tableEnv. executeSql (sinkDDL);
        TableResult result = tableEnv. executeSql (transformSQL);

        result. print ();
        env. execute ( "sync-flink-cdc" );
    }

}

 

运行结果

 

 

查看数据,已经录入进数据库里

 

 

 

 

参考链接:

https://blog.csdn.net/zhangjun5965/article/details/107605396

https://cloud.tencent.com/developer/article/1745233?from=article.detail.1747773

https://www.cnblogs.com/weijiqian/p/13994870.html

https://blog.csdn.net/dafei1288/article/details/118192917

 


来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/69993860/viewspace-2783228/,如需转载,请注明出处,否则将追究法律责任。

请登录后发表评论 登录
全部评论

注册时间:2021-01-25

  • 博文量
    35
  • 访问量
    11757