一种小资源情况下RDS数据实时同步StarRocks方案

一、背景

目前需要将阿里云RDS数据库的数据同步到自建的StarRocks集群。之前使用DolphinScheduler通过定时调度Datax任务,将数据同步到StarRocks集群中,但是随着业务的发展,这种方式出现了两个问题:

1.为了满足系统三级等保的要求,阿里云RDS不再支持通过公网进行访问,只能在阿里云内网中进行访问。

2.随着业务的发展,批量的数据同步已经无法满足业务对数据更新频率的要求。

为了解决以上的问题,诞生了如下的数据同步架构。

二、数据同步架构

为了解决上面面临的问题,设计了如下的数据同步架构,来进行数据的实时同步。具体架构如下:

1.使用一台4C8G的阿里云服务器,该服务器可以访问内网的RDS服务器。

2.将KAFKA集群开通公网访问。

3.在这台阿里云服务器上面部署数据实时同步的脚步,一边实时读取RDS的binlog,将其解析加密之后发送到KAFKA中。

4.在公司内网环境中创建KAFKA CONNECTOR集群,创建connector将kafka数据解密之后同步到公司自建的StarRocks中。

三、现有方案的调研

建设初期调研了一些现在主流的方案,但是发现各个方案都存在一定的问题吗,从而选择了目前的这种方式,调用的现有解决方案有:

  1. Flink-CDC

Flink-CDC是目前最流行的实时数据同步方案,但是经过调研发现Flink集群所需资源太大,目前只有一台4C8G的阿里云服务器,而且这已经是能得到的最高配置。 需要同步的表分布在两个RDS实例当中,经过梳理达到了5000多张

用于目前业务的分库分表多种多样(按照范围分表, 按照组织分表,按照年度分表,按照月度分表,按照组织月度组合分表等等),业务表因为历史原因存在大量的字段不规范问题(全大写、全小写、驼峰、下划线等等),采用Flink-CDC如果分表建设task,则资源根本不够,如果耦合在一起,则需要进行大量的编码,后续修改复杂,因此放弃该方案。

  1. Apache SeaTunnel

Apache SeaTunnel是目前流行的另外一个实时数据同步工具,但是其目前无法支持表的模糊匹配,由于业务系统中存储多种方式的分库分表技术,而且分表数量巨大,有些表分表数量成百上千,有些按照组织分表的则是随时可能新增表,导致其很难进行兼容,需要进行上千张表的配置,基本没有可行性,所以放弃该方案。

四、核心步骤技术方案

  1. binlog实时消费

binlog的实时数据同步采用开源项目python-mysql-replication进行实现,python-mysql-replication是在PyMYSQL之上构建的MySQL复制协议的纯Python实现,通过其可以很简单的实时消费RDS数据库的binlog。

Pure Python Implementation of MySQL replication protocol build on top of PyMYSQL. This allows you to receive event like insert, update, delete with their datas and raw SQL queries.

  1. 数据加解密

为了保证数据在两个内网直接传输的安全,要求需要对进行传输的数据进行加密,经过调研之后选择了 AES-GCM对称加密,AES-GCM是一种 高效,支持硬件加速 ,适用于大数据量加密、文件加密、流加密 。

  1. 数据同步到StarRocks

从kafka消费数据到StarRocks,采用的使用StarRocks官方支持的starrocks-connector-for-kafka,但是由于我们的数据进行了加密操作,所以需要对该组件进行扩展,再其中加入进行数据解密的操作。

  1. kafka内外网映射

由于RDS和StarRocks在两个不同的内网之中,为了连通两个内网,使用kafka进行数据的中转操作。这就需要kafka能够提供公网的访问。通过配置不同的advertised.listeners来进行实现。

  1. 批量同步

在进行一张历史已经存在的表数据同步的时候,需要先同步历史已经存在的数据,然后再按照binlog实时进行新数据的同步工作,历史数据的同步采用DataX来进行同步。

  1. DataX写Kafka

DataX属于批量数据同步的组件,而Kafka属于流式数据同步的组件,两者的定位不一致,因此DataX官方并没有用于Kafka的Writer,这就需要我们自己进行扩展,编写Kafka-Writer,来进行支持。

  1. StarRocks表的增删改

StarRocks中存在主键表模型,该模型支持数据的增删改操作,同时starrocks-connector-for-kafka底层采用StreamLoad进行实现,StreamLoad支持通过在数据中增加对应的__op字段来支持对表的数据进行增删改。

  1. 分库分表的支持

系统存在多种方式的分库分表,由于分库分表之后的主键可能重复,因此可以在数据同步的时候,对分库分表进行分析,设计以 (表名,原表主键) 或者 (库名,表名,原表主键)作为对应StarRocks表的主键,来进行对应的支持操作。

五、数据同步过程说明

下面以一张已经存在的表如何进行数据同步为例,进行整个数据同步过程的说明:

  1. 根据要同步的RDS中表的结构信息,在StarRocks中创建对应的表。

  2. 在kafka中创建对应的进行历史批量数据同步的topic和binlog增量同步的topic。

  3. 在进行增量同步的脚步中新增这张表的binlog同步配置,将binlog数据写入用于增量同步的kafka的topic中。

  4. 使用DataX将历史数据全量同步到用于批量同步的kafka的topic中。

  5. 创建用于同步历史数据到StarRocks表中的Connector,消费批量topic中的数据。

  6. 根据DataX返回的同步数据量和StarRocks中已经接收到的数据量进行比对,如果一致则表明历史数据已经全部同步完成,此时可以删除删除用于历史数据同步的topic和Connector,也可以保留不管。

  7. 创建用于增量同步的Connector,消费binlog数据,实时接入StarRocks。

六、具体的实现

  1. DataX的kafkawriter实现

进行数据加密的实现:

  1. starrocks-connector-for-kafka的实现

  1. Kafka的公网配置

Kafka的内外网配置,只需要修改kafka/config下面的server.properties文件中的如下配置即可。

  1. Kafka-Connector的部署流程

a. 创建一个目录,来存放connect的

b. 修改kafka的配置文件config/connect-distributed.properties(以1台为例)

c. 启动connect(以1台为例)

  1. 监听数据库binlog文件并加密发送到kafka

脚本中依赖的版本信息

七、配套生态脚本

  1. 批量与增量配置文件的生成

在同步一张新表的时候,可以修改改脚本中的RDS数据库的信息,运行该脚本,自动生成各个数据同步步骤的配置文件和脚本信息。

如果 数据库名称:test 表名称为:test_1, StarRocks中表名称为:ods_test_1, 运行该脚本之后会生成如下的文件

  • test.test_1.json 该文件是用于binlog同步的配置文件。

  • mzt_ods_cjm_all.test_1_connect.json 该文件是用于历史数据批量同步的KafkaConnector配置。

  • mzt_ods_cjm_all.test_1_datax_config.json 该文件是用于历史数据批量同步的DataX配置。

  • mzt_ods_cjm_all.test_1_datax_shell.sh 该文件是用于执行DataX任务的启动脚本。

  • mzt_ods_cjm_stream.ods_test_1-connect.json 该文件是用于增量数据同步的KafkaConnector配置。

  • ods_test_1_create_table.sql 该文件是用于在StarRocks中建表的SQL脚本文件。

  1. Kafka-Connector操作脚本

该脚本包含了Kafka Connector操作的各个API,可以很方便的进行Kafka Connector相关的操作或者各个任务的状态查询。

  1. StarRocks表最新日期检测脚本

该脚本用于检测StarRocks各个表中的最新的数据的时间,可以用于判断当前数据同步是否正常时使用。

表信息的配置文件,配置需要检测的表和对应的时间字段。

  1. 增量同步任务检测脚本

该脚本用于检测当前的数据同步任务脚本是否正常运行,未运行可以直接启动脚本,可以配置crontab实现服务的异常终止直接启动操作,可以加入消息告警。

  1. 增量同步的配置文件示例

这是一个用于增量数据同步的配置文件,其配置了具体的某张表的增量数据同步规则。

KEY
说明

key

表的唯一id,用于匹配binlog日志,组成为库.表 或者 模糊匹配开头

topic

数据写入的kafka的topic名称

need_table

列中是否需要加入表明,如何为true,则列中会加入一个字段table_name,值为当前RDS中表的名称,用于解决分库分表的问题

complete_regex

是否完整匹配表名称,如果为true,则根据key完整匹配表名称,用于解决分库分表的问题, 如果为false,则根据regex的值进行正常匹配

regex

匹配binlog表的正则表达式

column_mapping

表的列字段的映射,key为RDS中表字段名称。value为StarRocks中表字段的名称

  1. kafkaConnector的配置示例

KEY
说明

name

kafka connector的唯一名称

config.connector.class

连接器 默认值

config.topics

要消费的topic列表,多个使用,分隔

config.key.converter

key的转换器,保持默认

config.value.converter

value的转换器,保持默认

config.key.converter.schemas.enable

是否需要转换key

config.value.converter.schemas.enable

是否需要转换value

config.starrocks.http.url

StarRocks用于streamLoad的地址

config.starrocks.topic2table.map

topic与表的映射, 格式为 topic名称:表名, 多个直接使用,分隔

config.starrocks.username

StarRocks的用户名

config.starrocks.password

StarRocks的密码

config.starrocks.database.name

StarRocks数据库的名称

config.sink.properties.strip_outer_array

是否展开JSON数组

config.sink.properties.columns

列字段的列表

config.sink.properties.jsonpaths

JSON字段的列表,可列字段的列表一一对应

config.transforms

数据的转换器

config.transforms.decrypt.type

转换器的实现类

config.transforms.decrypt.secret.key

数据解密的秘钥

八、备注

  1. python-mysql-replication python实现的用于binlog同步的库。

  2. starrocks-connector-for-kafka Kafka Connector是StarRocks数据源连接器

  3. DataX 批量数据同步工具

  4. kafka-console-ui Kakfa可视化控制台

  5. StarRocks-kafka-Connector 通过kafkaConnector导入数据到StarRocks

  6. Kafka Connector的API列表

方法
路径
说明

GET

/connectors

返回活动连接器的列表

POST

/connectors

创建一个新的连接器; 请求主体应该是包含字符串name字段和config带有连接器配置参数的对象字段的JSON对象

GET

/connectors/{name}

获取有关特定连接器的信息

GET

/connectors/{name}/config

获取特定连接器的配置参数

PUT

/connectors/{name}/config

更新特定连接器的配置参数

GET

/connectors/{name}/status

获取连接器的当前状态,包括连接器是否正在运行,失败,已暂停等,分配给哪个工作者,失败时的错误信息以及所有任务的状态

GET

/connectors/{name}/tasks

获取当前为连接器运行的任务列表

GET

/connectors/{name}/tasks/{taskid}/status

获取任务的当前状态,包括如果正在运行,失败,暂停等,分配给哪个工作人员,如果失败,则返回错误信息

PUT

/connectors/{name}/pause

暂停连接器及其任务,停止消息处理,直到连接器恢复

PUT

/connectors/{name}/resume

恢复暂停的连接器(或者,如果连接器未暂停,则不执行任何操作)

POST

/connectors/{name}/restart

重新启动连接器(通常是因为失败)

POST

/connectors/{name}/tasks/{taskId}/restart

重启个别任务(通常是因为失败)

DELETE

/connectors/{name}

删除连接器,停止所有任务并删除其配置

  1. 加密算法参考

加密类型
推荐算法
优点
缺点
适用场景

对称加密

AES-GCM, ChaCha20

高效,支持硬件加速

需要安全管理密钥

大数据量加密、文件加密、流加密

非对称加密

RSA, ECDSA

无需共享密钥,高安全性

速度慢,不适合大数据加密

密钥交换、数字签名

流加密

ChaCha20

高效,低延迟

不适合文件加密

实时通信、视频流加密

哈希算法

SHA-256, BLAKE3

不可逆,速度快

不能用于加解密

数据校验、数字签名

九、踩坑记录

  1. python虚拟环境pip报错,没有SSL模块。

解决:使用支持http的pip源进行安装

  1. DATAX同步时间戳字段,Kafka中为数字,无法写入StarRocks的datetime类型字段中。

解决:在DATAX的同步字段映射中,使用DATA_FORMAT将其转换为字符串。

  1. AES-GCM在python端和Java端的实现问题。

解决:在python中加密会生成(nonce, chiphertext, tag)三元组信息,但是Java中解密会报错,在python中将 ciphertext和tag拼接起来,在Java中可以直接解密。

  1. BigDecimal字段类型,starrocks-connector-for-kafka无法解析报错。

解决:在starrocks-connector-for-kafka中进行解密的时候,FastJSON配置将BigDecimal转换为Double类型。

  1. 自定义打包的starrocks-connector-for-kafka,kafka Connector无法加载。

解决:必须使用Java8进行打包,使用了Java21打包,导致无法加载。

  1. 使用最新版的python-mysql-replication读取binlog,解析不到表字段。

解决:不要使用最新版本,使用0.45.1版本,可参考:issue#612

  1. python3.6无法运行python-mysql-replication。

解决:python-mysql-replication不支持python3.6,至少需要3.7版本,本项目使用3.8.4版本

  1. python JSON转换不支持byte,日期格式。

解决:自定义python的JSON转换格式。

  1. 设置python-mysql-replication的only_events导致消费不到任何binlog。

解决:经过测试,最终舍弃该参数的配置。

  1. 脚本运行过程中占用内存过大,导致其被系统kill

解决:可以手动触发GC垃圾回收,主动回收释放内存

十、目前数据同步情况

指标KEY
指标值

RDS实例数

2

同步逻辑表数量

56

同步物理表数量

5274

数据延迟

1分钟以内

Last updated

Was this helpful?