基于Apache DolphinScheduler的数据同步
一、背景
自从上次开始使用基于Hadoop的大数据体现方案之后,业务平稳发展,但是随着时间的推移,新的问题开始出现,主要出现的问题为两个:
数据的变更越来越频繁,基于之前SparkSQL任务的方式,只要需要对表结构进行变更,就需要重新修改Scala代码,然后重新进行任务的打包,这对于一些不熟悉代码的人来说,不太友好,而且成本也很高。
虽然使用了Presto对HIVE的数据查询进行了加速,但是所在数据量越来越大,分析要求越来越复杂,即席查询越来越多,由于集群本身资源有限,查询能力出现了显著瓶颈。
二、数据同步架构
随着技术的发展已经对大数据的认识,接触到了更多的大数据相关的知识与组件,基于此,通过认真分析与思考之后,对数据的同步方案进行了如下的重新设计。
数据存储与查询放弃了HDFS+HIVE+Presto的组合,转而采用现代化的MPP数据库StarRocks,StarRocks在数据查询的效率层面非常优秀,在相同资源的情况下,可以解决目前遇到的数据查询瓶颈。
数据同步放弃了SparkSQL,转而采用更加轻量级的DATAX来进行,其只需要通过简单的配置,即可完成数据的同步,同时其也支持StarRocks Writer,开发人员只需要具备简单的SQL知识,就可以完成整个数据同步任务的配置,难度大大降低,效率大大提升,友好度大大提升。
定时任务调度放弃Azkaban,采用现代化的任务调度工作Apache DolphinScheduler,通过可视化的页面进行调度任务工作流的配置,更加友好。

三、数据同步的详细流程
数据同步在这种方式下变动非常简单,只需要可视化的配置DataX任务,即可自动调度。下面的一个任务的配置示例
{
"job": {
"setting": {
"speed": {
"channel":1
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "",
"password": "",
"connection": [
{
"querySql": [
"SELECT CustomerId AS customer_id FROM base_info.base_customer where date(UpdateTime) > '${sdt}' and date(UpdateTime) < '${edt}'"
],
"jdbcUrl": [
"jdbc:mysql://IP:3306/base_info?characterEncoding=utf-8&useSSL=false&tinyInt1isBit=false"
]
}
]
}
},
"writer": {
"name": "starrockswriter",
"parameter": {
"username": "xxx",
"password": "xxx",
"database": "ods_cjm_test",
"table": "ods_base_customer",
"column": ["id"],
"preSql": [],
"postSql": [],
"jdbcUrl": "jdbc:mysql://IP:9030/",
"loadUrl": ["IP:8050", "IP:8050", "IP:8050"],
"loadProps": {
"format": "json",
"strip_outer_array": true
}
}
}
}
]
}
}
数据同步过程中,遇到了另外一个问题,即业务存在大量的分库分表的,这些分库分表的逻辑五花八门,60张左右的逻辑板,经过分库分表之后达到了惊人的5000多张,为每张表配置任务很显然不太正常,这就需要能够在进行数据同步的时候动态生成需要的表列表,把表列表配置到DataX的配置文件中去。
经过技术的调用,Apache DolphinScheduler的Python任务类型很适合做这个事情,由于公司本身使用了Apache DolphinScheduler3.0的版本,其Python任务还不支持返回数据到下游节点,但是社区最新版本已经支持该能力,因为按照已实现版本对其进行改造。
改造之后,Python节点能够将数据传递给他的下游节点,因此使用Python脚本查询获取需要进行同步的表列表,将其传递给DataX节点,完成动态表的数据同步
import pymysql
import datetime
def select_all_table(date: str):
result_list = []
sql = """
SELECT concat('"', table_name, '"')
FROM information_schema.`TABLES`
WHERE table_schema='hydra_production_flow'
and table_name like 't_package_flow_log_%'
and table_name like '%_{}'
""".format(date)
conn = pymysql.connect(host='', port=3306, user='', passwd='',
db='information_schema')
cur = conn.cursor()
cur.execute(query=sql)
while 1:
res = cur.fetchone()
if res is None:
break
result_list.append(res[0])
cur.close()
conn.close()
return result_list
if __name__ == '__main__':
# 获取当前年月
# 获取当前日期
today = datetime.date.today()
# 计算前一天的日期
yesterday = today - datetime.timedelta(days=1)
current_date = yesterday.strftime("%Y_%m")
table_list = select_all_table(current_date)
table_str = ",".join(table_list)
# 设置变量,传递给下游节点
print('${setValue(table_list=%s)}' % table_str)
{
"job": {
"setting": {
"speed": {
"channel":1
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "xxx",
"password": "xxxx",
"column": [
"id",
"concat('t_package_flow_log_',DATE_FORMAT(create_time,'%Y_%m'))",
"operation_type"
],
"where": "date(create_time) ${operator_symbol} '${dt}'",
"connection": [
{
"table": [
${table_list}
],
"jdbcUrl": [
"jdbc:mysql://xx:3306/hydra_production_flow?characterEncoding=utf-8&useSSL=false&tinyInt1isBit=false"
]
}
]
}
},
"writer": {
"name": "starrockswriter",
"parameter": {
"username": "xxxxxx",
"password": "xxxxxxx",
"database": "ods_cjm",
"table": "ods_t_package_flow_log",
"column": ["id", "table_name","operation_type"],
"preSql": [],
"postSql": [],
"jdbcUrl": "jdbc:mysql://IP:9030/",
"loadUrl": ["IP:8050", "IP:8050", "IP:8050"],
"loadProps": {
"format": "json",
"strip_outer_array": true
}
}
}
}
]
}
}
四、踩坑记录
DATAX只支持python2.x
下载支持python3.x的相关文件,替换DataX中的相同文件,即可支持python3.x使用
五、备注
StarRocks 高性能的MPP数据库
DataX 离线数据同步
Apache DolphinScheduler 任务调度工具
Last updated
Was this helpful?