import json
import os
import pymysql
import re
from datetime import datetime
from dateutil.relativedelta import relativedelta
import uuid
import subprocess
import logging
import hmac
import hashlib
import base64
import urllib.parse
import urllib
import requests
import time
from typing import List, Mapping
def list_files_in_directory(directory_path: str) -> List[str]:
"""
获取目录下的所有以.json结尾的文件
:param directory_path: 目录
:return: 文件列表
"""
entries = os.listdir(directory_path)
# 过滤出所有文件
files = [entry for entry in entries if
os.path.isfile(os.path.join(directory_path, entry)) and entry.endswith(".json")]
logging.info(f"读取配置文件数量:{len(files)}")
return files
def read_file_content(file_path: str) -> str:
"""
读取文件内容
:param file_path: 文件路径
:return: 文件内容
"""
with open(file_path, 'r', encoding='utf-8') as file:
content = file.read()
return content
def read_all_files_in_directory(directory_path: str) -> Mapping[str, str]:
"""
读取文件夹下面的所有文件的内容
:param directory_path: 文件夹路径
:return: 内容map
"""
logging.info(f"开始读取所有的配置文件信息")
files = list_files_in_directory(directory_path)
file_contents = {}
for file in files:
file_path = os.path.join(directory_path, file)
content = read_file_content(file_path)
file_contents[file] = content
sorted_items = sorted(file_contents.items())
sorted_dict = dict(sorted_items)
return file_contents
def search_table_list(datasource: json, search_table_sql_list: List[str]) -> List[str]:
"""
执行语句获取表信息
:param datasource: 数据源信息
:param search_table_sql_list: 查询表的SQL语句
:return: 表列表
"""
logging.info(f"开始查询需要同步的表")
host = datasource['host']
port = int(datasource['port'])
username = datasource['username']
password = datasource['password']
conn = pymysql.connect(host=host,
port=port,
user=username,
passwd=password,
db='',
charset='utf8',
connect_timeout=200,
autocommit=True,
read_timeout=2000
)
table_name_list = []
for search_table_sql in search_table_sql_list:
search_table_sql = parse_where_sql(search_table_sql)
with conn.cursor() as cursor:
cursor.execute(query=search_table_sql)
while 1:
res = cursor.fetchone()
if res is None:
break
table_name_list.append(res[0])
return table_name_list
def general_default_job_config() -> json:
"""
生成默认的datax配置
:return: 默认的配置
"""
default_job_json = """
{
"job": {
"setting": {
"speed": {
"channel":1
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "test",
"password": "test1234",
"connection": [
{
"querySql": [
"SELECT id, code from test.t_open_api_classify"
],
"jdbcUrl": [
"jdbc:mysql://IP:3306/test?characterEncoding=utf-8&useSSL=false&tinyInt1isBit=false"
]
}
]
}
},
"writer": {
"name": "kafkawriter",
"parameter": {
"bootstrapServers": "IP:9092,IP:9092,IP:9092",
"topic": "test-m-t-k",
"ack": "all",
"batchSize": 1000,
"retries": 0,
"keySerializer": "org.apache.kafka.common.serialization.StringSerializer",
"valueSerializer": "org.apache.kafka.common.serialization.StringSerializer",
"fieldDelimiter": ",",
"writeType": "json",
"topicNumPartition": 1,
"topicReplicationFactor": 1,
"encryptionKey": "5s8FGjerddfWkG/b64CGHHZYvQ=="
}
}
}
]
}
}
"""
return json.loads(default_job_json, encoding='utf-8')
def general_jdbc_url(json_config: json) -> str:
"""
根据数据源信息生成jdbc url
:param json_config: 配置
:return: jdbc url
"""
logging.info(f"开始解析jdbc url")
host = json_config['datasource']['host']
port = int(json_config['datasource']['port'])
database = json_config['table']['database']
url = "jdbc:mysql://{}:{}/{}".format(host, port, database)
# 解下properties
properties = json_config['datasource']['properties']
properties_list = []
if properties is not None and len(properties) > 0:
for key, value in properties.items():
properties_list.append(key + "=" + str(value))
url = url + "?" + "&".join(properties_list)
logging.info(f"jdbc url: {url}")
return url
def parse_where_sql(where_sql: str) -> str:
"""
解析where语句
:param where_sql: 原始where语句
:return: 转换之后的where语句
"""
# 定义支持的类型 $[yyyyMMdd+N_Y] $[yyyyMMdd-N_Y]
# 正则表达式模式
logging.info(f"还是解析where语句:where_sql: {where_sql}")
pattern = r"\$\[.*?\]"
return re.sub(pattern, replacement_function, where_sql)
def replacement_function(match):
"""
替换函数
:param match: 匹配结果
:return: 替换之后的结果
"""
matched_text = match.group(0)
return calc_datetime(matched_text)
def calc_datetime(expression: str) -> str:
"""
计算时间表达式
:param expression: 表达式
:return: 计算之后的值
"""
logging.info(f"开始计算时间参数:expression: {expression}")
# 设置映射
format_units = {
"yyyy": "%Y",
"MM": "%m",
"dd": "%d",
"HH": "%H",
"mm": "%M",
"ss": "%S"
}
unit_map = {
"Y": "yyyy",
"M": "MM",
"d": "dd",
"H": "HH",
"m": "mm",
"s": "ss"
}
# 解析参数
expression = expression[2:-1]
# 判断其开头,截取尾部
min_unit = None
for key, value in format_units.items():
if key in expression:
min_unit = key
expression = expression.replace(key, value)
# 替换完毕,确定是否有数字
logging.info(f"转换为Python格式的表达式:expression: {expression}")
# 定义正则表达式模式
pattern = r'([^0-9]+)([-+]\d+(\*\d+)?)(?:_([YMdHms]))?'
matches = re.match(pattern, expression)
# 输出拆分结果
if matches:
date_part = matches.group(1)
remainder = matches.group(2)
unit = matches.group(4)
if unit is not None and unit in unit_map.keys():
min_unit = unit_map[unit]
return calculate_expression(min_unit, date_part, remainder)
else:
return expression
def calculate_expression(min_unit: str, date_part: str, remainder: str) -> str:
"""
计算表达式
:param min_unit: 最小单位
:param date_part: 日期表达式部分
:param remainder: 偏移量部分
:return: 计算之后的结果
"""
logging.info(f"开始计算表达式:min_unit: {min_unit}, date_part: {date_part}, remainder:{remainder}")
# 获取当前日期和时间
now = datetime.now()
# 计算时间的偏移量
if remainder is None:
# 格式化的日期
formatted_datetime = now.strftime(date_part)
logging.info(f"日期偏移量为空,返回值:{formatted_datetime}")
return formatted_datetime
else:
# 计算偏移量
plus_or_sub = remainder[0:1]
offset = eval(remainder[1:])
logging.info(f"计算偏移量,plus_or_sub:{plus_or_sub}, offset:{offset}")
if min_unit == 'yyyy':
if plus_or_sub == '-':
now = now - relativedelta(years=offset)
else:
now = now + relativedelta(years=offset)
elif min_unit == 'MM':
if plus_or_sub == '-':
now = now - relativedelta(months=offset)
else:
now = now + relativedelta(months=offset)
elif min_unit == 'dd':
if plus_or_sub == '-':
now = now - relativedelta(days=offset)
else:
now = now + relativedelta(days=offset)
elif min_unit == 'HH':
if plus_or_sub == '-':
now = now - relativedelta(hours=offset)
else:
now = now + relativedelta(hours=offset)
elif min_unit == 'mm':
if plus_or_sub == '-':
now = now - relativedelta(minutes=offset)
else:
now = now + relativedelta(minutes=offset)
elif min_unit == 'ss':
if plus_or_sub == '-':
now = now - relativedelta(seconds=offset)
else:
now = now + relativedelta(seconds=offset)
formatted_datetime = now.strftime(date_part)
logging.info(f"日期偏移量为空,返回值:{formatted_datetime}")
return formatted_datetime
def general_reader(json_config: json) -> json:
"""
生成配置的reader部分
:param json_config: 配置
:return: JSON结果
"""
logging.info(f"开始生成DataX的配置JSON文件的reader内容")
reader_json = json.loads("{}", encoding='utf-8')
reader_json['name'] = "mysqlreader"
reader_json['parameter'] = {}
reader_json['parameter']['username'] = json_config['datasource']['username']
reader_json['parameter']['password'] = json_config['datasource']['password']
reader_json['parameter']['column'] = json_config['table']['column']
reader_json['parameter']['connection'] = [{}]
reader_json['parameter']['connection'][0]['table'] = json_config['table']['table']
reader_json['parameter']['connection'][0]['jdbcUrl'] = [general_jdbc_url(json_config)]
where_sql = json_config['table']['where']
if where_sql is not None and where_sql != '':
reader_json['parameter']['where'] = parse_where_sql(where_sql)
return reader_json
def general_writer(json_config: json) -> json:
"""
生成配置的Writer部分
:param json_config: 配置
:return: JSON结果
"""
columns = json_config['table']['column']
new_columns = []
for column in columns:
column = str(column).replace("`", "")
if " AS " in str(column).upper():
new_columns.append(str(column).split(" AS ")[1].strip())
else:
new_columns.append(str(column).strip())
logging.info(f"开始生成DataX的配置JSON文件的Writer内容")
writer_json = json.loads("{}", encoding='utf-8')
writer_json['name'] = "kafkawriter"
writer_json['parameter'] = {}
writer_json['parameter']['bootstrapServers'] = "IP:19092,IP:19093,IP:19094"
writer_json['parameter']['topic'] = json_config['kafka']['topic']
writer_json['parameter']['ack'] = "all"
writer_json['parameter']['batchSize'] = 1000
writer_json['parameter']['retries'] = 3
writer_json['parameter']['keySerializer'] = "org.apache.kafka.common.serialization.StringSerializer"
writer_json['parameter']['valueSerializer'] = "org.apache.kafka.common.serialization.StringSerializer"
writer_json['parameter']['fieldDelimiter'] = ","
writer_json['parameter']['writeType'] = "json"
writer_json['parameter']['topicNumPartition'] = 1
writer_json['parameter']['topicReplicationFactor'] = 1
writer_json['parameter']['encryptionKey'] = "5s8FGjerddfWkG/b64CGHHZYvQ=="
writer_json['parameter']['column'] = new_columns
return writer_json
def general_datax_job_config(datax_config: str):
"""
生成job的配置内容
:param datax_config: 配置
:return: 完整的JSON内容
"""
logging.info(f"开始生成DataX的配置JSON文件内容, {datax_config}")
json_config = json.loads(datax_config, encoding='utf-8')
# 判定是否需要查询表
datasource = json_config['datasource']
table = json_config['table']['table']
search_table_sql_list = json_config['table']['searchTableSql']
if search_table_sql_list is not None and len(search_table_sql_list) > 0:
# 查询表列表,覆盖原来的配置信息
table_list = search_table_list(datasource, search_table_sql_list)
else:
table_list = [table]
json_config['table']['table'] = table_list
# 开始生成配置文件
job_json = general_default_job_config()
job_json['job']['content'][0]['reader'] = general_reader(json_config)
job_json['job']['content'][0]['writer'] = general_writer(json_config)
return job_json
def write_job_file(base_path: str, job_config: json) -> str:
"""
生成job的JSON配置文件
:param base_path: 根路径
:param job_config: 配置信息
:return: 完整的JSON文件路径
"""
# 生成一个脚本
logging.info(f"开始创建DataX的配置JSON文件")
date_day = datetime.now().strftime('%Y-%m-%d')
timestamp_milliseconds = int(datetime.now().timestamp() * 1000)
# 生成UUID
file_name = str(uuid.uuid4()).replace("-", "") + "_" + str(timestamp_milliseconds) + ".json"
# 完整文件路径
# 创建文件夹
mkdir_if_not_exist(base_path + "/task/datax/json/" + date_day)
complex_file_path = base_path + "/task/datax/json/" + date_day + "/" + file_name
logging.info(f"完整的DataX的配置JSON文件路径:{complex_file_path}")
with open(complex_file_path, 'w+', encoding='utf-8') as f:
f.write(json.dumps(job_config, ensure_ascii=False))
return complex_file_path
def mkdir_if_not_exist(path):
"""
创建目录
:param path: 目录路径
:return: None
"""
os.makedirs(path, exist_ok=True)
def write_task_file(base_path: str, python_path: str, datax_path: str, job_file_path: str) -> str:
"""
写shell脚本文件
:param base_path: 跟路径
:param python_path: python执行文件路径
:param datax_path: datax执行文件路径
:param job_file_path: JSON配置文件路径
:return: shell脚本的完整路径
"""
# 组合内容
logging.info(f"开始创建Shell脚本文件")
task_content = python_path + " " + datax_path + " " + job_file_path
# 生成一个脚本
date_day = datetime.now().strftime('%Y-%m-%d')
timestamp_milliseconds = int(datetime.now().timestamp() * 1000)
# 生成UUID
task_file_name = str(uuid.uuid4()).replace("-", "") + "_" + str(timestamp_milliseconds) + ".sh"
# 完整文件路径
# 创建文件夹
mkdir_if_not_exist(base_path + "/task/datax/shell/" + date_day)
complex_file_path = base_path + "/task/datax/shell/" + date_day + "/" + task_file_name
logging.info(f"完整的shell脚本路径: {complex_file_path}")
with open(complex_file_path, 'w+', encoding='utf-8') as f:
f.write(task_content)
# 添加执行权限
current_permissions = os.stat(complex_file_path).st_mode
# 添加执行权限 (权限值 0o111 表示用户、组和其他人的执行权限)
new_permissions = current_permissions | 0o111
# 使用 os.chmod 设置新的权限
os.chmod(complex_file_path, new_permissions)
return complex_file_path
def signs(dd_secret: str, timestamp: str) -> str:
"""
钉钉机器人签名
:param dd_secret: 秘钥
:param timestamp: 时间戳
:return: 签名
"""
secret_enc = dd_secret.encode('utf-8')
string_to_sign = '{}\n{}'.format(timestamp, dd_secret)
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote(base64.b64encode(hmac_code))
return sign
def real_send_msg(dd_secret: str, dd_access_token: str, text: json):
"""
发送钉钉机器人消息
:param dd_secret: 秘钥
:param dd_access_token: token
:param text: 内容
:return: None
"""
timestamp = str(round(time.time() * 1000))
sign = signs(dd_secret, timestamp)
headers = {'Content-Type': 'application/json'}
web_hook = f'https://oapi.dingtalk.com/robot/send?access_token={dd_access_token}×tamp={timestamp}&sign={sign}'
# 定义要发送的数据
requests.post(web_hook, data=json.dumps(text), headers=headers)
def send_msg(dd_secret: str, dd_access_token: str, job_start_time: str, total_count: int, success_count: int, fail_task_list: List[str]):
"""
组合钉钉消息
:param dd_secret: 秘钥
:param dd_access_token: token
:param job_start_time: 任务开始时间
:param total_count: 总任务数
:param success_count: 成功任务数
:return: NONE
"""
title = '### <font color=#CCCC00>数据同步结果</font>'
if success_count == total_count:
title = '### <font color=#00FF00>数据同步结果</font>'
elif success_count == 0:
title = '### <font color=#FF0000>数据同步结果</font>'
end_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
result = {
"msgtype": "markdown",
"markdown": {
"title": "数据同步结果",
"text": title + ' \n\n\n\n- '
+ "总同步任务数:" + str(total_count) + "\n\n- "
+ "成功任务数:" + str(success_count) + "\n\n- "
+ "失败任务数" + str(total_count - success_count) + "\n\n- "
+ "开始时间:" + str(job_start_time) + "\n\n- "
+ "结束时间:" + str(end_time) + "\n\n- "
+ "失败列表:" + str(fail_task_list) + "\n\n "
}
}
if success_count < total_count:
result['markdown']['at'] = json.loads("{\"atMobiles\": [\"12345678997\"]}")
real_send_msg(dd_secret, dd_access_token, result)
def run_job(dd_secret, dd_access_token, job_start_time, base_path: str, python_script_path: str, datax_json_path: str):
"""
运行任务
:param dd_secret: 秘钥
:param dd_access_token: token
:param job_start_time: 任务开始时间
:param base_path: 根路径
:param python_script_path: Python执行路径
:param datax_json_path: datax执行路径
:return: NONE
"""
task_content_list = read_all_files_in_directory(base_path + "/task/config/")
success_count = 0
total_count = len(task_content_list)
fail_task_list = []
for task_content in task_content_list:
try:
logging.info(f"开始生成,配置文件名称:{task_content}")
job_config = general_datax_job_config(task_content_list[task_content])
job_file_path = write_job_file(base_path, job_config)
shell_path = write_task_file(base_path, python_script_path, datax_json_path, job_file_path)
logging.info(f"shell脚本创建成功,路径为:{base_path}")
# 调用脚本
call_shell(shell_path)
success_count += 1
except Exception as e:
fail_task_list.append(task_content)
logging.error(f"配置文件:{task_content} 执行失败", e)
# 发送消息
send_msg(dd_secret, dd_access_token, job_start_time, total_count, success_count, fail_task_list)
def call_shell(shell_path: str):
"""
执行shell脚本
:param shell_path: shell脚本路径
:return: NONE
"""
logging.info(f"调用shell脚本,路径为:{shell_path}")
result = subprocess.run(shell_path,
check=True,
shell=True,
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
# 输出标准输出
logging.info(f"shell脚本{shell_path}标准输出:%s", result.stdout)
# # 输出标准错误输出
logging.info(f"shell脚本{shell_path}标准错误输出:%s", result.stderr)
# # 输出返回码
logging.info(f"shell脚本{shell_path}的返回码:%s", result.returncode)
if __name__ == '__main__':
"""
码中台数据同步任务脚本
使用前请修改如下配置信息:
- secret 钉钉机器人的秘钥
- access_token 钉钉机器人的token
- python_path Python的安装路径
- datax_path datax的执行文件路径
"""
# 钉钉配置
start_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
secret = ''
access_token = ''
python_path = "/usr/bin/python3"
datax_path = "/opt/datax-k/bin/datax.py"
# 当前脚本文件的目录路径
script_dir = '/opt/data-job'
curr_date_day = datetime.now().strftime('%Y-%m-%d')
# 创建文件夹
mkdir_if_not_exist(script_dir + "/logs/" + curr_date_day)
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(lineno)d - %(message)s',
filename='logs/' + curr_date_day + '/app.log',
filemode='w')
run_job(secret, access_token, start_time, script_dir, python_path, datax_path)
logging.shutdown()