基于自定义脚本的数据同步
Last updated
Was this helpful?
Last updated
Was this helpful?
自从采用Apache DolphinScheduler + StarRocks数据方案以来,一切都很平稳发展;但是随着时间的推移,总会出现新的问题。
随着数据量的增多,使用方需求的增长,已经一些其他因素的影响,对目前的数据同步架构带来了一些不小的挑战,这些问题导致任务的维护和更新越来越麻烦,需要耗费大量的时间来进行,急需一种新的方式来处理。
由于等保的要求,线上RDS数据库不再支持通过公网访问,又因为StarRocks也在内网,这就导致了之前的数据同步链路彻底断裂,需要新的方案。
由于数据结构的频繁变更、服务器资源导致的任务调度异常等等原因,需要重跑数据的需求越来越多,这就导致需要不断的修改任务的调度参数(如日期),目前已经上线了10个业务的调度任务,也就是重新同步一次,就需要依次修改调度这10个任务,这期间还需要专人进行状态的跟踪,即使修改调度,压力很大。
鉴于数据链路变更,导致原本数据链路断裂的问题,通过调研之后,决定采用KAFKA进行数据的中转,在内网部署KAFKA集群,同时该集群提供公网访问地址;在RDS所在的内网机器上使用DataX将RDS数据通过公网地址写入KAFKA,在内网中通过KafkaConnector消费数据写入StarRocks。
鉴于新的资源有限,原本内网提供了4台8C32G的服务器,但是新的RDS所在内网只能提供一台最大4C8G的服务器。因此放弃了使用Apache DolphinScheduler来进行调度,直接使用crontab调用对应的Python脚本进行DataX任务调度。
新的方案,主要解决的问题有两个,一是DataX如何将数据写入KAFKA,二是Python脚本怎么解决前面遇到的修改复杂的问题。
DataX写KAFKA
DataX本身并没有kafkawriter实现,这就需要我们自己实现一个KafkaWriter来支持我们的需求,同时为了数据安全,希望能够对数据进行加密。
DataX的KafkaWriter实现
public class KafkaWriter extends Writer {
public static class Job extends Writer.Job {
private static final Logger logger = LoggerFactory.getLogger(Job.class);
private Configuration conf = null;
@Override
public List<Configuration> split(int mandatoryNumber) {
List<Configuration> configurations = new ArrayList<Configuration>(mandatoryNumber);
for (int i = 0; i < mandatoryNumber; i++) {
configurations.add(conf);
}
return configurations;
}
private void validateParameter() {
this.conf.getNecessaryValue(Key.BOOTSTRAP_SERVERS, KafkaWriterErrorCode.REQUIRED_VALUE);
this.conf.getNecessaryValue(Key.TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE);
}
@Override
public void init() {
this.conf = super.getPluginJobConf();
logger.info("kafka writer params:{}", conf.toJSON());
this.validateParameter();
}
@Override
public void destroy() {
}
}
public static class Task extends Writer.Task {
private static final Logger logger = LoggerFactory.getLogger(Task.class);
private static final String NEWLINE_FLAG = System.getProperty("line.separator", "\n");
private Producer<String, String> producer;
private String fieldDelimiter;
private Configuration conf;
private Properties props;
private AesEncryption aesEncryption;
private List<String> columns;
@Override
public void init() {
this.conf = super.getPluginJobConf();
fieldDelimiter = conf.getUnnecessaryValue(Key.FIELD_DELIMITER, "\t", null);
columns = conf.getList(Key.COLUMN_LIST, new ArrayList<>(), String.class);
props = new Properties();
props.put("bootstrap.servers", conf.getString(Key.BOOTSTRAP_SERVERS));
props.put("acks", conf.getUnnecessaryValue(Key.ACK, "0", null));//这意味着leader需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。
props.put("retries", conf.getUnnecessaryValue(Key.RETRIES, "5", null));
props.put("retry.backoff.ms", "1000");
props.put("batch.size", conf.getUnnecessaryValue(Key.BATCH_SIZE, "16384", null));
props.put("linger.ms", 100);
props.put("connections.max.idle.ms", 300000);
props.put("max.in.flight.requests.per.connection", 5);
props.put("socket.keepalive.enable", true);
props.put("key.serializer", conf.getUnnecessaryValue(Key.KEYSERIALIZER, "org.apache.kafka.common.serialization.StringSerializer", null));
props.put("value.serializer", conf.getUnnecessaryValue(Key.VALUESERIALIZER, "org.apache.kafka.common.serialization.StringSerializer", null));
producer = new KafkaProducer<String, String>(props);
String encryptKey = conf.getUnnecessaryValue(Key.ENCRYPT_KEY, null, null);
if(encryptKey != null){
aesEncryption = new AesEncryption(encryptKey);
}
}
@Override
public void prepare() {
AdminClient adminClient = AdminClient.create(props);
ListTopicsResult topicsResult = adminClient.listTopics();
String topic = conf.getNecessaryValue(Key.TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE);
try {
if (!topicsResult.names().get().contains(topic)) {
new NewTopic(
topic,
Integer.parseInt(conf.getUnnecessaryValue(Key.TOPIC_NUM_PARTITION, "1", null)),
Short.parseShort(conf.getUnnecessaryValue(Key.TOPIC_REPLICATION_FACTOR, "1", null))
);
List<NewTopic> newTopics = new ArrayList<NewTopic>();
adminClient.createTopics(newTopics);
}
adminClient.close();
} catch (Exception e) {
throw new DataXException(KafkaWriterErrorCode.CREATE_TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE.getDescription());
}
}
@Override
public void startWrite(RecordReceiver lineReceiver) {
logger.info("start to writer kafka");
Record record = null;
while ((record = lineReceiver.getFromReader()) != null) {
if (conf.getUnnecessaryValue(Key.WRITE_TYPE, WriteType.TEXT.name(), null)
.equalsIgnoreCase(WriteType.TEXT.name())) {
producer.send(new ProducerRecord<String, String>(this.conf.getString(Key.TOPIC),
Md5Encrypt.md5Hexdigest(recordToString(record)),
aesEncryption ==null ? recordToString(record): JSONObject.toJSONString(aesEncryption.encrypt(recordToString(record))))
);
} else if (conf.getUnnecessaryValue(Key.WRITE_TYPE, WriteType.TEXT.name(), null)
.equalsIgnoreCase(WriteType.JSON.name())) {
producer.send(new ProducerRecord<String, String>(this.conf.getString(Key.TOPIC),
Md5Encrypt.md5Hexdigest(recordToString(record)),
aesEncryption ==null ? recordToJsonString(record) : JSONObject.toJSONString(aesEncryption.encrypt(recordToJsonString(record))))
);
}
producer.flush();
}
}
@Override
public void destroy() {
if (producer != null) {
producer.close();
}
}
/**
* 数据格式化
*
* @param record
* @return
*/
private String recordToString(Record record) {
int recordLength = record.getColumnNumber();
if (0 == recordLength) {
return NEWLINE_FLAG;
}
Column column;
StringBuilder sb = new StringBuilder();
for (int i = 0; i < recordLength; i++) {
column = record.getColumn(i);
sb.append(column.asString()).append(fieldDelimiter);
}
sb.setLength(sb.length() - 1);
sb.append(NEWLINE_FLAG);
return sb.toString();
}
/**
* 数据格式化
*
* @param record 数据
*
*/
private String recordToJsonString(Record record) {
int recordLength = record.getColumnNumber();
if (0 == recordLength) {
return "{}";
}
Map<String, Object> map = new HashMap<>();
for (int i = 0; i < recordLength; i++) {
String key = columns.get(i);
Column column = record.getColumn(i);
map.put(key, column.getRawData());
}
return JSONObject.toJSONString(map);
}
}
}
进行数据加密的实现:
public class AesEncryption {
private SecretKey secretKey;
public AesEncryption(String secretKey) {
byte[] keyBytes = Base64.getDecoder().decode(secretKey);
this.secretKey = new SecretKeySpec(keyBytes, 0, keyBytes.length, "AES");
}
public String encrypt(String data) {
try {
Cipher cipher = Cipher.getInstance("AES");
cipher.init(Cipher.ENCRYPT_MODE, secretKey);
byte[] encryptedBytes = cipher.doFinal(data.getBytes());
return Base64.getEncoder().encodeToString(encryptedBytes);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public String decrypt(String encryptedData) throws Exception {
Cipher cipher = Cipher.getInstance("AES");
cipher.init(Cipher.DECRYPT_MODE, secretKey);
byte[] decodedBytes = Base64.getDecoder().decode(encryptedData);
byte[] decryptedBytes = cipher.doFinal(decodedBytes);
return new String(decryptedBytes);
}
}
Kafka的公网配置 Kafka的内外网配置,只需要修改kafka/config下面的server.properties文件中的如下配置即可。
# 配置kafka的监听端口,同时监听9093和9092
listeners=INTERNAL://kafka节点3内网IP:9093,EXTERNAL://kafka节点3内网IP:9092
# 配置kafka的对外广播地址, 同时配置内网的9093和外网的19092
advertised.listeners=INTERNAL://kafka节点3内网IP:9093,EXTERNAL://公网IP:19092
# 配置地址协议
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
# 指定broker内部通信的地址
inter.broker.listener.name=INTERNAL
自定义的配置文件
Python脚本需要能够自动生成对应的DataX调度的配置文件和shell脚本,自动调度DataX进行任务的执行。因此经过调研,采用自定义配置文件,通过读取配置文件,动态生成对应的DataX任务脚本和调度脚本,调度任务执行。
自定义的配置文件示例1:
{
"datasource": {
"host": "xxxxxx",
"port": "3306",
"username": "xxxxx",
"password": "xxxxxxx",
"properties": {
"characterEncoding": "utf-8",
"useSSL": "false",
"tinyInt1isBit": "false"
}
},
"table": {
"database": "app",
"table": "device",
"column": [
"Id AS id",
"CompanyName AS company_name",
"CompanyId AS company_id",
"SecretKey AS secret_key",
"Brand AS brand",
"ModelType AS model_type",
"Enable AS enable",
"CAST(CreateTime as CHAR) AS create_time",
"CAST(UpdateTime as CHAR) AS update_time"
],
"where": "date(UpdateTime) >= '$[yyyy-MM-dd-8]'",
"searchTableSql": []
},
"kafka": {
"topic": "mzt_ods_cjm.ods_device"
}
}
支持分库分表的配置文件示例2
{
"datasource": {
"host": "xxxxxxx",
"port": "3306",
"username": "xxxxxxx",
"password": "xxxxxxxx",
"properties": {
"characterEncoding": "utf-8",
"useSSL": "false",
"tinyInt1isBit": "false"
}
},
"table": {
"database": "hydra_logistics_flow",
"table": "",
"column": [
"id",
"concat('t_logistics_sweep_out_code_flow_',DATE_FORMAT(create_time,'%Y')) AS table_name",
"cus_org_id",
"CAST(create_time as CHAR) AS create_time",
"replace_product_id",
"replace_product_name",
"replace_product_code"
],
"where": "date(create_time) >= '$[yyyy-MM-dd-8]'",
"searchTableSql": [
"SELECT concat('t_logistics_sweep_out_code_flow_',YEAR(SUBDATE(CURDATE(), 1))) AS TABLE_NAME",
"SELECT concat('t_logistics_sweep_out_code_flow_',YEAR(DATE_SUB(DATE_SUB(CURDATE(), INTERVAL 1 DAY), INTERVAL 1 YEAR))) AS TABLE_NAME"
]
},
"kafka": {
"topic": "mzt_ods_cjm.ods_t_logistics_sweep_out_code_flow"
}
}
如上的配置文件,解释如下:
datasource
RDS数据源
datasource.host
RDS数据库的host
datasource.port
RDS数据库的端口
datasource.username
RDS数据库的用户名
datasource.password
RDS数据库的密码
datasource.properties
jdbc连接的参数,连接时拼接为?key=value&key=value
table
要同步的表信息
table.database
RDS数据库名称
table.table
RDS中表的名称,分库分表的可以为空
table.column
RDS表中要同步的字段列表,支持取别名和使用函数
table.where
同步数据的过滤条件
table.searchTableSql
查询表名称的SQL语句,用于动态分库分表
kafka
kafka相关的配置
kafka.topic
数据要写入的kafka topic的名称
Python调度脚本
import json
import os
import pymysql
import re
from datetime import datetime
from dateutil.relativedelta import relativedelta
import uuid
import subprocess
import logging
import hmac
import hashlib
import base64
import urllib.parse
import urllib
import requests
import time
from typing import List, Mapping
def list_files_in_directory(directory_path: str) -> List[str]:
"""
获取目录下的所有以.json结尾的文件
:param directory_path: 目录
:return: 文件列表
"""
entries = os.listdir(directory_path)
# 过滤出所有文件
files = [entry for entry in entries if
os.path.isfile(os.path.join(directory_path, entry)) and entry.endswith(".json")]
logging.info(f"读取配置文件数量:{len(files)}")
return files
def read_file_content(file_path: str) -> str:
"""
读取文件内容
:param file_path: 文件路径
:return: 文件内容
"""
with open(file_path, 'r', encoding='utf-8') as file:
content = file.read()
return content
def read_all_files_in_directory(directory_path: str) -> Mapping[str, str]:
"""
读取文件夹下面的所有文件的内容
:param directory_path: 文件夹路径
:return: 内容map
"""
logging.info(f"开始读取所有的配置文件信息")
files = list_files_in_directory(directory_path)
file_contents = {}
for file in files:
file_path = os.path.join(directory_path, file)
content = read_file_content(file_path)
file_contents[file] = content
sorted_items = sorted(file_contents.items())
sorted_dict = dict(sorted_items)
return file_contents
def search_table_list(datasource: json, search_table_sql_list: List[str]) -> List[str]:
"""
执行语句获取表信息
:param datasource: 数据源信息
:param search_table_sql_list: 查询表的SQL语句
:return: 表列表
"""
logging.info(f"开始查询需要同步的表")
host = datasource['host']
port = int(datasource['port'])
username = datasource['username']
password = datasource['password']
conn = pymysql.connect(host=host,
port=port,
user=username,
passwd=password,
db='',
charset='utf8',
connect_timeout=200,
autocommit=True,
read_timeout=2000
)
table_name_list = []
for search_table_sql in search_table_sql_list:
search_table_sql = parse_where_sql(search_table_sql)
with conn.cursor() as cursor:
cursor.execute(query=search_table_sql)
while 1:
res = cursor.fetchone()
if res is None:
break
table_name_list.append(res[0])
return table_name_list
def general_default_job_config() -> json:
"""
生成默认的datax配置
:return: 默认的配置
"""
default_job_json = """
{
"job": {
"setting": {
"speed": {
"channel":1
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "test",
"password": "test1234",
"connection": [
{
"querySql": [
"SELECT id, code from test.t_open_api_classify"
],
"jdbcUrl": [
"jdbc:mysql://IP:3306/test?characterEncoding=utf-8&useSSL=false&tinyInt1isBit=false"
]
}
]
}
},
"writer": {
"name": "kafkawriter",
"parameter": {
"bootstrapServers": "IP:9092,IP:9092,IP:9092",
"topic": "test-m-t-k",
"ack": "all",
"batchSize": 1000,
"retries": 0,
"keySerializer": "org.apache.kafka.common.serialization.StringSerializer",
"valueSerializer": "org.apache.kafka.common.serialization.StringSerializer",
"fieldDelimiter": ",",
"writeType": "json",
"topicNumPartition": 1,
"topicReplicationFactor": 1,
"encryptionKey": "5s8FGjerddfWkG/b64CGHHZYvQ=="
}
}
}
]
}
}
"""
return json.loads(default_job_json, encoding='utf-8')
def general_jdbc_url(json_config: json) -> str:
"""
根据数据源信息生成jdbc url
:param json_config: 配置
:return: jdbc url
"""
logging.info(f"开始解析jdbc url")
host = json_config['datasource']['host']
port = int(json_config['datasource']['port'])
database = json_config['table']['database']
url = "jdbc:mysql://{}:{}/{}".format(host, port, database)
# 解下properties
properties = json_config['datasource']['properties']
properties_list = []
if properties is not None and len(properties) > 0:
for key, value in properties.items():
properties_list.append(key + "=" + str(value))
url = url + "?" + "&".join(properties_list)
logging.info(f"jdbc url: {url}")
return url
def parse_where_sql(where_sql: str) -> str:
"""
解析where语句
:param where_sql: 原始where语句
:return: 转换之后的where语句
"""
# 定义支持的类型 $[yyyyMMdd+N_Y] $[yyyyMMdd-N_Y]
# 正则表达式模式
logging.info(f"还是解析where语句:where_sql: {where_sql}")
pattern = r"\$\[.*?\]"
return re.sub(pattern, replacement_function, where_sql)
def replacement_function(match):
"""
替换函数
:param match: 匹配结果
:return: 替换之后的结果
"""
matched_text = match.group(0)
return calc_datetime(matched_text)
def calc_datetime(expression: str) -> str:
"""
计算时间表达式
:param expression: 表达式
:return: 计算之后的值
"""
logging.info(f"开始计算时间参数:expression: {expression}")
# 设置映射
format_units = {
"yyyy": "%Y",
"MM": "%m",
"dd": "%d",
"HH": "%H",
"mm": "%M",
"ss": "%S"
}
unit_map = {
"Y": "yyyy",
"M": "MM",
"d": "dd",
"H": "HH",
"m": "mm",
"s": "ss"
}
# 解析参数
expression = expression[2:-1]
# 判断其开头,截取尾部
min_unit = None
for key, value in format_units.items():
if key in expression:
min_unit = key
expression = expression.replace(key, value)
# 替换完毕,确定是否有数字
logging.info(f"转换为Python格式的表达式:expression: {expression}")
# 定义正则表达式模式
pattern = r'([^0-9]+)([-+]\d+(\*\d+)?)(?:_([YMdHms]))?'
matches = re.match(pattern, expression)
# 输出拆分结果
if matches:
date_part = matches.group(1)
remainder = matches.group(2)
unit = matches.group(4)
if unit is not None and unit in unit_map.keys():
min_unit = unit_map[unit]
return calculate_expression(min_unit, date_part, remainder)
else:
return expression
def calculate_expression(min_unit: str, date_part: str, remainder: str) -> str:
"""
计算表达式
:param min_unit: 最小单位
:param date_part: 日期表达式部分
:param remainder: 偏移量部分
:return: 计算之后的结果
"""
logging.info(f"开始计算表达式:min_unit: {min_unit}, date_part: {date_part}, remainder:{remainder}")
# 获取当前日期和时间
now = datetime.now()
# 计算时间的偏移量
if remainder is None:
# 格式化的日期
formatted_datetime = now.strftime(date_part)
logging.info(f"日期偏移量为空,返回值:{formatted_datetime}")
return formatted_datetime
else:
# 计算偏移量
plus_or_sub = remainder[0:1]
offset = eval(remainder[1:])
logging.info(f"计算偏移量,plus_or_sub:{plus_or_sub}, offset:{offset}")
if min_unit == 'yyyy':
if plus_or_sub == '-':
now = now - relativedelta(years=offset)
else:
now = now + relativedelta(years=offset)
elif min_unit == 'MM':
if plus_or_sub == '-':
now = now - relativedelta(months=offset)
else:
now = now + relativedelta(months=offset)
elif min_unit == 'dd':
if plus_or_sub == '-':
now = now - relativedelta(days=offset)
else:
now = now + relativedelta(days=offset)
elif min_unit == 'HH':
if plus_or_sub == '-':
now = now - relativedelta(hours=offset)
else:
now = now + relativedelta(hours=offset)
elif min_unit == 'mm':
if plus_or_sub == '-':
now = now - relativedelta(minutes=offset)
else:
now = now + relativedelta(minutes=offset)
elif min_unit == 'ss':
if plus_or_sub == '-':
now = now - relativedelta(seconds=offset)
else:
now = now + relativedelta(seconds=offset)
formatted_datetime = now.strftime(date_part)
logging.info(f"日期偏移量为空,返回值:{formatted_datetime}")
return formatted_datetime
def general_reader(json_config: json) -> json:
"""
生成配置的reader部分
:param json_config: 配置
:return: JSON结果
"""
logging.info(f"开始生成DataX的配置JSON文件的reader内容")
reader_json = json.loads("{}", encoding='utf-8')
reader_json['name'] = "mysqlreader"
reader_json['parameter'] = {}
reader_json['parameter']['username'] = json_config['datasource']['username']
reader_json['parameter']['password'] = json_config['datasource']['password']
reader_json['parameter']['column'] = json_config['table']['column']
reader_json['parameter']['connection'] = [{}]
reader_json['parameter']['connection'][0]['table'] = json_config['table']['table']
reader_json['parameter']['connection'][0]['jdbcUrl'] = [general_jdbc_url(json_config)]
where_sql = json_config['table']['where']
if where_sql is not None and where_sql != '':
reader_json['parameter']['where'] = parse_where_sql(where_sql)
return reader_json
def general_writer(json_config: json) -> json:
"""
生成配置的Writer部分
:param json_config: 配置
:return: JSON结果
"""
columns = json_config['table']['column']
new_columns = []
for column in columns:
column = str(column).replace("`", "")
if " AS " in str(column).upper():
new_columns.append(str(column).split(" AS ")[1].strip())
else:
new_columns.append(str(column).strip())
logging.info(f"开始生成DataX的配置JSON文件的Writer内容")
writer_json = json.loads("{}", encoding='utf-8')
writer_json['name'] = "kafkawriter"
writer_json['parameter'] = {}
writer_json['parameter']['bootstrapServers'] = "IP:19092,IP:19093,IP:19094"
writer_json['parameter']['topic'] = json_config['kafka']['topic']
writer_json['parameter']['ack'] = "all"
writer_json['parameter']['batchSize'] = 1000
writer_json['parameter']['retries'] = 3
writer_json['parameter']['keySerializer'] = "org.apache.kafka.common.serialization.StringSerializer"
writer_json['parameter']['valueSerializer'] = "org.apache.kafka.common.serialization.StringSerializer"
writer_json['parameter']['fieldDelimiter'] = ","
writer_json['parameter']['writeType'] = "json"
writer_json['parameter']['topicNumPartition'] = 1
writer_json['parameter']['topicReplicationFactor'] = 1
writer_json['parameter']['encryptionKey'] = "5s8FGjerddfWkG/b64CGHHZYvQ=="
writer_json['parameter']['column'] = new_columns
return writer_json
def general_datax_job_config(datax_config: str):
"""
生成job的配置内容
:param datax_config: 配置
:return: 完整的JSON内容
"""
logging.info(f"开始生成DataX的配置JSON文件内容, {datax_config}")
json_config = json.loads(datax_config, encoding='utf-8')
# 判定是否需要查询表
datasource = json_config['datasource']
table = json_config['table']['table']
search_table_sql_list = json_config['table']['searchTableSql']
if search_table_sql_list is not None and len(search_table_sql_list) > 0:
# 查询表列表,覆盖原来的配置信息
table_list = search_table_list(datasource, search_table_sql_list)
else:
table_list = [table]
json_config['table']['table'] = table_list
# 开始生成配置文件
job_json = general_default_job_config()
job_json['job']['content'][0]['reader'] = general_reader(json_config)
job_json['job']['content'][0]['writer'] = general_writer(json_config)
return job_json
def write_job_file(base_path: str, job_config: json) -> str:
"""
生成job的JSON配置文件
:param base_path: 根路径
:param job_config: 配置信息
:return: 完整的JSON文件路径
"""
# 生成一个脚本
logging.info(f"开始创建DataX的配置JSON文件")
date_day = datetime.now().strftime('%Y-%m-%d')
timestamp_milliseconds = int(datetime.now().timestamp() * 1000)
# 生成UUID
file_name = str(uuid.uuid4()).replace("-", "") + "_" + str(timestamp_milliseconds) + ".json"
# 完整文件路径
# 创建文件夹
mkdir_if_not_exist(base_path + "/task/datax/json/" + date_day)
complex_file_path = base_path + "/task/datax/json/" + date_day + "/" + file_name
logging.info(f"完整的DataX的配置JSON文件路径:{complex_file_path}")
with open(complex_file_path, 'w+', encoding='utf-8') as f:
f.write(json.dumps(job_config, ensure_ascii=False))
return complex_file_path
def mkdir_if_not_exist(path):
"""
创建目录
:param path: 目录路径
:return: None
"""
os.makedirs(path, exist_ok=True)
def write_task_file(base_path: str, python_path: str, datax_path: str, job_file_path: str) -> str:
"""
写shell脚本文件
:param base_path: 跟路径
:param python_path: python执行文件路径
:param datax_path: datax执行文件路径
:param job_file_path: JSON配置文件路径
:return: shell脚本的完整路径
"""
# 组合内容
logging.info(f"开始创建Shell脚本文件")
task_content = python_path + " " + datax_path + " " + job_file_path
# 生成一个脚本
date_day = datetime.now().strftime('%Y-%m-%d')
timestamp_milliseconds = int(datetime.now().timestamp() * 1000)
# 生成UUID
task_file_name = str(uuid.uuid4()).replace("-", "") + "_" + str(timestamp_milliseconds) + ".sh"
# 完整文件路径
# 创建文件夹
mkdir_if_not_exist(base_path + "/task/datax/shell/" + date_day)
complex_file_path = base_path + "/task/datax/shell/" + date_day + "/" + task_file_name
logging.info(f"完整的shell脚本路径: {complex_file_path}")
with open(complex_file_path, 'w+', encoding='utf-8') as f:
f.write(task_content)
# 添加执行权限
current_permissions = os.stat(complex_file_path).st_mode
# 添加执行权限 (权限值 0o111 表示用户、组和其他人的执行权限)
new_permissions = current_permissions | 0o111
# 使用 os.chmod 设置新的权限
os.chmod(complex_file_path, new_permissions)
return complex_file_path
def signs(dd_secret: str, timestamp: str) -> str:
"""
钉钉机器人签名
:param dd_secret: 秘钥
:param timestamp: 时间戳
:return: 签名
"""
secret_enc = dd_secret.encode('utf-8')
string_to_sign = '{}\n{}'.format(timestamp, dd_secret)
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote(base64.b64encode(hmac_code))
return sign
def real_send_msg(dd_secret: str, dd_access_token: str, text: json):
"""
发送钉钉机器人消息
:param dd_secret: 秘钥
:param dd_access_token: token
:param text: 内容
:return: None
"""
timestamp = str(round(time.time() * 1000))
sign = signs(dd_secret, timestamp)
headers = {'Content-Type': 'application/json'}
web_hook = f'https://oapi.dingtalk.com/robot/send?access_token={dd_access_token}×tamp={timestamp}&sign={sign}'
# 定义要发送的数据
requests.post(web_hook, data=json.dumps(text), headers=headers)
def send_msg(dd_secret: str, dd_access_token: str, job_start_time: str, total_count: int, success_count: int, fail_task_list: List[str]):
"""
组合钉钉消息
:param dd_secret: 秘钥
:param dd_access_token: token
:param job_start_time: 任务开始时间
:param total_count: 总任务数
:param success_count: 成功任务数
:return: NONE
"""
title = '### <font color=#CCCC00>数据同步结果</font>'
if success_count == total_count:
title = '### <font color=#00FF00>数据同步结果</font>'
elif success_count == 0:
title = '### <font color=#FF0000>数据同步结果</font>'
end_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
result = {
"msgtype": "markdown",
"markdown": {
"title": "数据同步结果",
"text": title + ' \n\n\n\n- '
+ "总同步任务数:" + str(total_count) + "\n\n- "
+ "成功任务数:" + str(success_count) + "\n\n- "
+ "失败任务数" + str(total_count - success_count) + "\n\n- "
+ "开始时间:" + str(job_start_time) + "\n\n- "
+ "结束时间:" + str(end_time) + "\n\n- "
+ "失败列表:" + str(fail_task_list) + "\n\n "
}
}
if success_count < total_count:
result['markdown']['at'] = json.loads("{\"atMobiles\": [\"12345678997\"]}")
real_send_msg(dd_secret, dd_access_token, result)
def run_job(dd_secret, dd_access_token, job_start_time, base_path: str, python_script_path: str, datax_json_path: str):
"""
运行任务
:param dd_secret: 秘钥
:param dd_access_token: token
:param job_start_time: 任务开始时间
:param base_path: 根路径
:param python_script_path: Python执行路径
:param datax_json_path: datax执行路径
:return: NONE
"""
task_content_list = read_all_files_in_directory(base_path + "/task/config/")
success_count = 0
total_count = len(task_content_list)
fail_task_list = []
for task_content in task_content_list:
try:
logging.info(f"开始生成,配置文件名称:{task_content}")
job_config = general_datax_job_config(task_content_list[task_content])
job_file_path = write_job_file(base_path, job_config)
shell_path = write_task_file(base_path, python_script_path, datax_json_path, job_file_path)
logging.info(f"shell脚本创建成功,路径为:{base_path}")
# 调用脚本
call_shell(shell_path)
success_count += 1
except Exception as e:
fail_task_list.append(task_content)
logging.error(f"配置文件:{task_content} 执行失败", e)
# 发送消息
send_msg(dd_secret, dd_access_token, job_start_time, total_count, success_count, fail_task_list)
def call_shell(shell_path: str):
"""
执行shell脚本
:param shell_path: shell脚本路径
:return: NONE
"""
logging.info(f"调用shell脚本,路径为:{shell_path}")
result = subprocess.run(shell_path,
check=True,
shell=True,
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
# 输出标准输出
logging.info(f"shell脚本{shell_path}标准输出:%s", result.stdout)
# # 输出标准错误输出
logging.info(f"shell脚本{shell_path}标准错误输出:%s", result.stderr)
# # 输出返回码
logging.info(f"shell脚本{shell_path}的返回码:%s", result.returncode)
if __name__ == '__main__':
"""
码中台数据同步任务脚本
使用前请修改如下配置信息:
- secret 钉钉机器人的秘钥
- access_token 钉钉机器人的token
- python_path Python的安装路径
- datax_path datax的执行文件路径
"""
# 钉钉配置
start_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
secret = ''
access_token = ''
python_path = "/usr/bin/python3"
datax_path = "/opt/datax-k/bin/datax.py"
# 当前脚本文件的目录路径
script_dir = '/opt/data-job'
curr_date_day = datetime.now().strftime('%Y-%m-%d')
# 创建文件夹
mkdir_if_not_exist(script_dir + "/logs/" + curr_date_day)
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(lineno)d - %(message)s',
filename='logs/' + curr_date_day + '/app.log',
filemode='w')
run_job(secret, access_token, start_time, script_dir, python_path, datax_path)
logging.shutdown()
同步日期的控制
我们在之前的任务同步中,遇到的问题便是日期的修改很麻烦,因此我们需要一个更加简单的方式来进行日期的批量更新。在我们上面的调度脚本中,包含了对日期表达式的解析,我们自定义了一种时间的表达式$[yyyyMMddHHmmss+/-N_Y] 通过解析该表达式,我们可以生成需要的任意时间,该时间表达式的含义为:
yyyy 表示年份
MM 表示月份
dd 表示日期
HH 表示24进制小时
mm 表示分钟
ss 表示秒
表示当前时间加上N
表示当前时间减去N
Y 表示加减的单位,可以是YMdHms(年、月、日、时、分、秒)
通过对该表达式的解析,我们可以生成相对于当前之前或之后的任何格式的时间字符串,将其用于同步的where条件中,既可以完成针对时间的解析。
如何更新日期
日期目前可以计算,但是我们需要能够批量修改配置文件中的WHERE条件中的时间表达式,如我们想同步8天前的数据,我们就需要将脚本中的表达式修改为$[yyyyMMdd-8_d] ,即代表当前时间减去8天,这样我们就可以同步八天前那一天的数据,但是我们可能想同步从8天气到现在的所有数据,那么我们希望我们也能批量修改where表达式中的条件,如将=改为>=。
鉴于以上的需求,我们开发了一个新的Python脚本,通过简单的配置,即可一次修改所有脚本中的where条件中的表达式,这样,我们只需要执行两个脚本,就完成了一切,再也不需要依次修改执行10个工作流了。
import json
import os
import logging
from typing import List, Mapping
import re
from datetime import datetime, date
def list_files_in_directory(directory_path: str) -> List[str]:
"""
获取目录下的所有以.json结尾的文件
:param directory_path: 目录
:return: 文件列表
"""
entries = os.listdir(directory_path)
# 过滤出所有文件
files = [entry for entry in entries if
os.path.isfile(os.path.join(directory_path, entry)) and entry.endswith(".json")]
logging.info(f"读取配置文件数量:{len(files)}")
return files
def read_file_content(file_path: str) -> str:
"""
读取文件内容
:param file_path: 文件路径
:return: 文件内容
"""
with open(file_path, 'r', encoding='utf-8') as file:
content = file.read()
return content
def read_all_files_in_directory(directory_path: str) -> Mapping[str, str]:
"""
读取文件夹下面的所有文件的内容
:param directory_path: 文件夹路径
:return: 内容map
"""
logging.info(f"开始读取所有的配置文件信息")
files = list_files_in_directory(directory_path)
file_contents = {}
for file in files:
file_path = os.path.join(directory_path, file)
content = read_file_content(file_path)
file_contents[file] = content
sorted_items = sorted(file_contents.items())
sorted_dict = dict(sorted_items)
return file_contents
def parse_where_sql(where_sql: str, sub_day: int, comparator: str = None) -> str:
"""
解析where语句
:param where_sql: 原始where语句
:param sub_day: 天数
:param comparator: 比较符 包括 = != > < >= <=
:return: 转换之后的where语句
"""
# 定义支持的类型 $[yyyyMMdd+N_Y] $[yyyyMMdd-N_Y]
# 正则表达式模式
pattern = r'\$(\[.*?\])'
matches = re.finditer(pattern, where_sql)
for match in matches:
matched_text = match.group(1)
new_search = calc_datetime(matched_text, sub_day)
where_sql = where_sql.replace(matched_text, new_search)
legal_comparator_list = ['>==','<>', '!=', '>=', '<=', '=', '>','<']
legal_default = '@'
if comparator is not None:
for legal_comparator in legal_comparator_list:
if legal_comparator in where_sql:
where_sql = where_sql.replace(legal_comparator, legal_default)
where_sql = where_sql.replace(legal_default, comparator)
return where_sql
def calc_datetime(expression: str, sub_day: int) -> str:
"""
计算时间表达式
:param expression: 表达式
:param sub_day: 天数
:return: 计算之后的值
"""
# 替换完毕,确定是否有数字
# 定义正则表达式模式
pattern = r'([^0-9]+)([-+]\d+(\*\d+)?)(?:_([YMdHms]))?'
matches = re.match(pattern, expression)
# 输出拆分结果
if matches:
date_part = matches.group(1)
remainder = matches.group(2)
unit = matches.group(4)
plus_or_sub = remainder[0:1]
if unit is not None:
return date_part + plus_or_sub + str(sub_day) + '_' + unit + "]"
else:
return date_part + plus_or_sub + str(sub_day) + "]"
else:
return expression
def check_parma(formatted_date: str, sub_day: int, comparator: str = None):
"""
校验参数是否合法
:param formatted_date: 格式化日期
:param sub_day: 天数
:param comparator: 操作符
:return: NONE
"""
legal_comparator = ['=', '<>', '!=', '>', '>=', '<', '<=']
if formatted_date is None and sub_day is None:
raise "formatted_date 和 sub_day不能同时为空"
if formatted_date is not None:
try:
datetime.strptime(formatted_date, "%Y-%m-%d")
except Exception as _:
raise "formatted_date 必须是一个完整的yyyy-MM-dd日期格式, 当前sub_day={}".format(sub_day)
if formatted_date is None and not isinstance(sub_day, int):
raise "sub_day 必须是一个整数, 当前sub_day={}".format(sub_day)
if comparator is not None and comparator not in legal_comparator:
raise "comparator 不合法,合法操作列表为:{} 当前comparator={}".format(legal_comparator, comparator)
def update_file(base_path: str, sub_day: int, comparator: str = None):
"""
更新配置文件
:param base_path 配置文件根目录
:param sub_day 要减去的天数
:param comparator 比较符
"""
file_dict = read_all_files_in_directory(base_path)
for key, value in file_dict.items():
json_data = json.loads(value, encoding='utf-8')
where_sql = json_data['table']['where']
if where_sql is not None:
new_where_sql = parse_where_sql(where_sql, sub_day, comparator)
json_data['table']['where'] = new_where_sql
search_tal_sql_list = json_data['table']['searchTableSql']
if search_tal_sql_list is not None:
new_search_table_sql_list = []
for search_tal_sql in search_tal_sql_list:
new_search_table_sql = parse_where_sql(search_tal_sql, sub_day)
new_search_table_sql_list.append(new_search_table_sql)
json_data['table']['searchTableSql'] = new_search_table_sql_list
with open(base_path + "/" + key, "w+", encoding='utf-8') as f:
f.write(json.dumps(json_data, ensure_ascii=False, indent=2))
print("{} 更新完成".format(key))
if __name__ == '__main__':
"""
更新数据同步配置文件的日期
"""
dir_path = r'/opt/data-job/task/config'
# 多少天前
day = 6
# 要指定的日期
date_format = '2024-11-19'
# where表达式的条件
comparator_symbol = '>='
check_parma(date_format, day, comparator_symbol)
if date_format is not None:
# 使用date_format的值覆盖day
single_date = datetime.strptime(date_format, "%Y-%m-%d").date()
current_date = date.today()
day = (current_date - single_date).days
update_file(dir_path, day, comparator_symbol)
通过KafkaConnector同步数据到StarRocks
starrocks-connector-for-kafka的实现
StarRocks官方提供了starrocks-connector-for-kafka的实现,我们只需要在其中加入我们的数据解密逻辑即可直接使用。
package com.starrocks.connector.kafka.transforms;
public class DecryptJsonTransformation <R extends ConnectRecord<R>> implements Transformation<R> {
private static final Logger LOG = LoggerFactory.getLogger(DecryptJsonTransformation.class);
private AesEncryption aesEncryption;
private interface ConfigName {
String SECRET_KEY = "secret.key";
}
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(ConfigName.SECRET_KEY, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "secret key");
@Override
public R apply(R record) {
if (record.value() == null) {
return record;
}
String value = (String) record.value();
try {
String newValue = aesEncryption.decrypt(value);
JSONObject jsonObject = JSON.parseObject(newValue, JSONReader.Feature.UseBigDecimalForDoubles);
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), null, jsonObject, record.timestamp());
} catch (Exception e) {
return record;
}
}
@Override
public ConfigDef config() {
return CONFIG_DEF;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, map);
String secretKey = config.getString(ConfigName.SECRET_KEY);
aesEncryption = new AesEncryption(secretKey);
}
}
解密的逻辑
package com.starrocks.connector.kafka;
import javax.crypto.Cipher;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import java.util.Base64;
public class AesEncryption {
private SecretKey secretKey;
public AesEncryption(String secretKey) {
byte[] keyBytes = Base64.getDecoder().decode(secretKey);
this.secretKey = new SecretKeySpec(keyBytes, 0, keyBytes.length, "AES");
}
public String encrypt(String data) {
try {
Cipher cipher = Cipher.getInstance("AES");
cipher.init(Cipher.ENCRYPT_MODE, secretKey);
byte[] encryptedBytes = cipher.doFinal(data.getBytes());
return Base64.getEncoder().encodeToString(encryptedBytes);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public String decrypt(String encryptedData) throws Exception {
Cipher cipher = Cipher.getInstance("AES");
cipher.init(Cipher.DECRYPT_MODE, secretKey);
byte[] decodedBytes = Base64.getDecoder().decode(encryptedData);
byte[] decryptedBytes = cipher.doFinal(decodedBytes);
return new String(decryptedBytes);
}
}
2. 配置KafkaConnector任务
{
"name": "mzt_ods_cjm.ods_device-connect",
"config": {
"connector.class": "com.starrocks.connector.kafka.StarRocksSinkConnector",
"topics": "mzt_ods_cjm.ods_device",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "false",
"starrocks.http.url": "IP:8050,IP:8050,IP:8050",
"starrocks.topic2table.map": "mzt_ods_cjm.ods_device:ods_device",
"starrocks.username": "xxxxxxx",
"starrocks.password": "xxxxxx",
"starrocks.database.name": "ods_cjm",
"sink.properties.strip_outer_array": "true",
"sink.properties.columns": "id,company_name,company_id,secret_key,",
"sink.properties.jsonpaths": "[\"$.id\",\"$.company_name\",\"$.company_id\",\"$.secret_key\"]",
"transforms": "decrypt",
"transforms.decrypt.type": "com.starrocks.connector.kafka.transforms.DecryptJsonTransformation",
"transforms.decrypt.secret.key": "5s8ekjRWkG/b64CGHHZYvQ=="
}
}
starrocks-connector-for-kafka Kafka Connector是StarRocks数据源连接器
DataX 批量数据同步工具
kafka-console-ui Kakfa可视化控制台
StarRocks-kafka-Connector 通过kafkaConnector导入数据到StarRocks
Kafka Connector的API列表
GET
/connectors
返回活动连接器的列表
POST
/connectors
创建一个新的连接器; 请求主体应该是包含字符串name字段和config带有连接器配置参数的对象字段的JSON对象
GET
/connectors/{name}
获取有关特定连接器的信息
GET
/connectors/{name}/config
获取特定连接器的配置参数
PUT
/connectors/{name}/config
更新特定连接器的配置参数
GET
/connectors/{name}/status
获取连接器的当前状态,包括连接器是否正在运行,失败,已暂停等,分配给哪个工作者,失败时的错误信息以及所有任务的状态
GET
/connectors/{name}/tasks
获取当前为连接器运行的任务列表
GET
/connectors/{name}/tasks/{taskid}/status
获取任务的当前状态,包括如果正在运行,失败,暂停等,分配给哪个工作人员,如果失败,则返回错误信息
PUT
/connectors/{name}/pause
暂停连接器及其任务,停止消息处理,直到连接器恢复
PUT
/connectors/{name}/resume
恢复暂停的连接器(或者,如果连接器未暂停,则不执行任何操作)
POST
/connectors/{name}/restart
重新启动连接器(通常是因为失败)
POST
/connectors/{name}/tasks/{taskId}/restart
重启个别任务(通常是因为失败)
DELETE
/connectors/{name}
删除连接器,停止所有任务并删除其配置