基于自定义脚本的数据同步

一、背景

自从采用Apache DolphinScheduler + StarRocks数据方案以来,一切都很平稳发展;但是随着时间的推移,总会出现新的问题。

随着数据量的增多,使用方需求的增长,已经一些其他因素的影响,对目前的数据同步架构带来了一些不小的挑战,这些问题导致任务的维护和更新越来越麻烦,需要耗费大量的时间来进行,急需一种新的方式来处理。

  1. 由于等保的要求,线上RDS数据库不再支持通过公网访问,又因为StarRocks也在内网,这就导致了之前的数据同步链路彻底断裂,需要新的方案。

  2. 由于数据结构的频繁变更、服务器资源导致的任务调度异常等等原因,需要重跑数据的需求越来越多,这就导致需要不断的修改任务的调度参数(如日期),目前已经上线了10个业务的调度任务,也就是重新同步一次,就需要依次修改调度这10个任务,这期间还需要专人进行状态的跟踪,即使修改调度,压力很大。

二、数据同步架构

鉴于数据链路变更,导致原本数据链路断裂的问题,通过调研之后,决定采用KAFKA进行数据的中转,在内网部署KAFKA集群,同时该集群提供公网访问地址;在RDS所在的内网机器上使用DataX将RDS数据通过公网地址写入KAFKA,在内网中通过KafkaConnector消费数据写入StarRocks。

鉴于新的资源有限,原本内网提供了4台8C32G的服务器,但是新的RDS所在内网只能提供一台最大4C8G的服务器。因此放弃了使用Apache DolphinScheduler来进行调度,直接使用crontab调用对应的Python脚本进行DataX任务调度。

三、具体的数据同步

新的方案,主要解决的问题有两个,一是DataX如何将数据写入KAFKA,二是Python脚本怎么解决前面遇到的修改复杂的问题。

  1. DataX写KAFKA

DataX本身并没有kafkawriter实现,这就需要我们自己实现一个KafkaWriter来支持我们的需求,同时为了数据安全,希望能够对数据进行加密。

DataX的KafkaWriter实现

进行数据加密的实现:

Kafka的公网配置 Kafka的内外网配置,只需要修改kafka/config下面的server.properties文件中的如下配置即可。

  1. 自定义的配置文件

Python脚本需要能够自动生成对应的DataX调度的配置文件和shell脚本,自动调度DataX进行任务的执行。因此经过调研,采用自定义配置文件,通过读取配置文件,动态生成对应的DataX任务脚本和调度脚本,调度任务执行。

自定义的配置文件示例1:

支持分库分表的配置文件示例2

如上的配置文件,解释如下:

KEY
说明

datasource

RDS数据源

datasource.host

RDS数据库的host

datasource.port

RDS数据库的端口

datasource.username

RDS数据库的用户名

datasource.password

RDS数据库的密码

datasource.properties

jdbc连接的参数,连接时拼接为?key=value&key=value

table

要同步的表信息

table.database

RDS数据库名称

table.table

RDS中表的名称,分库分表的可以为空

table.column

RDS表中要同步的字段列表,支持取别名和使用函数

table.where

同步数据的过滤条件

table.searchTableSql

查询表名称的SQL语句,用于动态分库分表

kafka

kafka相关的配置

kafka.topic

数据要写入的kafka topic的名称

  1. Python调度脚本

  1. 同步日期的控制

我们在之前的任务同步中,遇到的问题便是日期的修改很麻烦,因此我们需要一个更加简单的方式来进行日期的批量更新。在我们上面的调度脚本中,包含了对日期表达式的解析,我们自定义了一种时间的表达式$[yyyyMMddHHmmss+/-N_Y] 通过解析该表达式,我们可以生成需要的任意时间,该时间表达式的含义为:

  • yyyy 表示年份

  • MM 表示月份

  • dd 表示日期

  • HH 表示24进制小时

  • mm 表示分钟

  • ss 表示秒

    • 表示当前时间加上N

    • 表示当前时间减去N

  • Y 表示加减的单位,可以是YMdHms(年、月、日、时、分、秒)

通过对该表达式的解析,我们可以生成相对于当前之前或之后的任何格式的时间字符串,将其用于同步的where条件中,既可以完成针对时间的解析。

  1. 如何更新日期

日期目前可以计算,但是我们需要能够批量修改配置文件中的WHERE条件中的时间表达式,如我们想同步8天前的数据,我们就需要将脚本中的表达式修改为$[yyyyMMdd-8_d] ,即代表当前时间减去8天,这样我们就可以同步八天前那一天的数据,但是我们可能想同步从8天气到现在的所有数据,那么我们希望我们也能批量修改where表达式中的条件,如将=改为>=。

鉴于以上的需求,我们开发了一个新的Python脚本,通过简单的配置,即可一次修改所有脚本中的where条件中的表达式,这样,我们只需要执行两个脚本,就完成了一切,再也不需要依次修改执行10个工作流了。

  1. 通过KafkaConnector同步数据到StarRocks

    1. starrocks-connector-for-kafka的实现

StarRocks官方提供了starrocks-connector-for-kafka的实现,我们只需要在其中加入我们的数据解密逻辑即可直接使用。

解密的逻辑

四、备注

  1. starrocks-connector-for-kafka Kafka Connector是StarRocks数据源连接器

  2. DataX 批量数据同步工具

  3. kafka-console-ui Kakfa可视化控制台

  4. StarRocks-kafka-Connector 通过kafkaConnector导入数据到StarRocks

  5. Kafka Connector的API列表

方法
路径
说明

GET

/connectors

返回活动连接器的列表

POST

/connectors

创建一个新的连接器; 请求主体应该是包含字符串name字段和config带有连接器配置参数的对象字段的JSON对象

GET

/connectors/{name}

获取有关特定连接器的信息

GET

/connectors/{name}/config

获取特定连接器的配置参数

PUT

/connectors/{name}/config

更新特定连接器的配置参数

GET

/connectors/{name}/status

获取连接器的当前状态,包括连接器是否正在运行,失败,已暂停等,分配给哪个工作者,失败时的错误信息以及所有任务的状态

GET

/connectors/{name}/tasks

获取当前为连接器运行的任务列表

GET

/connectors/{name}/tasks/{taskid}/status

获取任务的当前状态,包括如果正在运行,失败,暂停等,分配给哪个工作人员,如果失败,则返回错误信息

PUT

/connectors/{name}/pause

暂停连接器及其任务,停止消息处理,直到连接器恢复

PUT

/connectors/{name}/resume

恢复暂停的连接器(或者,如果连接器未暂停,则不执行任何操作)

POST

/connectors/{name}/restart

重新启动连接器(通常是因为失败)

POST

/connectors/{name}/tasks/{taskId}/restart

重启个别任务(通常是因为失败)

DELETE

/connectors/{name}

删除连接器,停止所有任务并删除其配置

Last updated

Was this helpful?