{
"job": {
"setting": {
"speed": {
"channel":1
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "",
"password": "",
"connection": [
{
"querySql": [
"SELECT CustomerId AS customer_id FROM base_info.base_customer where date(UpdateTime) > '${sdt}' and date(UpdateTime) < '${edt}'"
],
"jdbcUrl": [
"jdbc:mysql://IP:3306/base_info?characterEncoding=utf-8&useSSL=false&tinyInt1isBit=false"
]
}
]
}
},
"writer": {
"name": "starrockswriter",
"parameter": {
"username": "xxx",
"password": "xxx",
"database": "ods_cjm_test",
"table": "ods_base_customer",
"column": ["id"],
"preSql": [],
"postSql": [],
"jdbcUrl": "jdbc:mysql://IP:9030/",
"loadUrl": ["IP:8050", "IP:8050", "IP:8050"],
"loadProps": {
"format": "json",
"strip_outer_array": true
}
}
}
}
]
}
}
数据同步过程中,遇到了另外一个问题,即业务存在大量的分库分表的,这些分库分表的逻辑五花八门,60张左右的逻辑板,经过分库分表之后达到了惊人的5000多张,为每张表配置任务很显然不太正常,这就需要能够在进行数据同步的时候动态生成需要的表列表,把表列表配置到DataX的配置文件中去。
经过技术的调用,Apache DolphinScheduler的Python任务类型很适合做这个事情,由于公司本身使用了Apache DolphinScheduler3.0的版本,其Python任务还不支持返回数据到下游节点,但是社区最新版本已经支持该能力,因为按照已实现版本对其进行改造。
import pymysql
import datetime
def select_all_table(date: str):
result_list = []
sql = """
SELECT concat('"', table_name, '"')
FROM information_schema.`TABLES`
WHERE table_schema='hydra_production_flow'
and table_name like 't_package_flow_log_%'
and table_name like '%_{}'
""".format(date)
conn = pymysql.connect(host='', port=3306, user='', passwd='',
db='information_schema')
cur = conn.cursor()
cur.execute(query=sql)
while 1:
res = cur.fetchone()
if res is None:
break
result_list.append(res[0])
cur.close()
conn.close()
return result_list
if __name__ == '__main__':
# 获取当前年月
# 获取当前日期
today = datetime.date.today()
# 计算前一天的日期
yesterday = today - datetime.timedelta(days=1)
current_date = yesterday.strftime("%Y_%m")
table_list = select_all_table(current_date)
table_str = ",".join(table_list)
# 设置变量,传递给下游节点
print('${setValue(table_list=%s)}' % table_str)
{
"job": {
"setting": {
"speed": {
"channel":1
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "xxx",
"password": "xxxx",
"column": [
"id",
"concat('t_package_flow_log_',DATE_FORMAT(create_time,'%Y_%m'))",
"operation_type"
],
"where": "date(create_time) ${operator_symbol} '${dt}'",
"connection": [
{
"table": [
${table_list}
],
"jdbcUrl": [
"jdbc:mysql://xx:3306/hydra_production_flow?characterEncoding=utf-8&useSSL=false&tinyInt1isBit=false"
]
}
]
}
},
"writer": {
"name": "starrockswriter",
"parameter": {
"username": "xxxxxx",
"password": "xxxxxxx",
"database": "ods_cjm",
"table": "ods_t_package_flow_log",
"column": ["id", "table_name","operation_type"],
"preSql": [],
"postSql": [],
"jdbcUrl": "jdbc:mysql://IP:9030/",
"loadUrl": ["IP:8050", "IP:8050", "IP:8050"],
"loadProps": {
"format": "json",
"strip_outer_array": true
}
}
}
}
]
}
}