object MarketMysqlToHiveEtl extends SparkHivePartitionOverwriteApplication{
/**
* 删除已存在的分区
*
* @param spark SparkSessions实例
* @param date 日期
* @param properties 数据库配置
*/
def delete_partition(spark: SparkSession, properties:Properties, date: String):Unit={
val odsDatabaseName = properties.getProperty("hive.datasource.ods")
DropPartitionTools.dropPartitionIfExists(spark,odsDatabaseName,"ods_t_money_record","ds",date)
DropPartitionTools.dropPartitionIfExists(spark,odsDatabaseName,"ods_t_account","ds",date)
}
/**
* 抽取数据
* @param spark SparkSession实例
* @param properties 数据库配置
* @param date 日期
*/
def loadData(spark: SparkSession, properties:Properties, date: String): Unit ={
// 删除历史数据,解决重复同步问题
delete_partition(spark,properties,date)
// 获取数据源配置
val odsDatabaseName = properties.get("hive.datasource.ods")
val dataSource = DataSourceUtils.getDataSourceProperties(FinalCode.MARKET_MYSQL_FILENAME,properties)
var sql = s"select id,account_id,type,original_id,original_code,money,reason,user_type,user_id,organization_id," +
s"create_time,update_time,detail,deleted,parent_id,counts,'${date}' AS ds from TABLENAME where date(update_time) ='${date}'"
// 同步数据
MysqlToHiveTools.readFromMysqlIncrement(spark,dataSource,sql.replace("TABLENAME","t_money_record"),
s"${odsDatabaseName}.ods_t_money_record",SaveMode.Append,"ds")
sql = s"select id,code,customer_code,name,mobile,type,organization_id,organization_name,create_time,update_time,deleted,status,customer_name," +
s"customer_id,channel_type,nike_name,version,register_Time,'${date}' AS ds from TABLENAME where date(update_time) ='${date}'"
MysqlToHiveTools.readFromMysqlIncrement(spark,dataSource,sql.replace("TABLENAME","t_account"),
s"${odsDatabaseName}.ods_t_account",SaveMode.Append,"ds")
}
/**
* 数据etl
* @param spark SparkSession实例
* @param SparkSession 数据库配置
*/
def etl(spark: SparkSession, properties:Properties): Unit = {
val sparkConf = spark.sparkContext.getConf
// 获取同步的日期
var lastDate = sparkConf.get("spark.etl.last.day", DateUtils.getLastDayString)
val dateList = new ListBuffer[String]()
if(lastDate.isEmpty){
// 未配置,设置为前一天
lastDate = DateUtils.getLastDayString
}
if(lastDate.contains("~")){
// 如果是时间段,获取时间段中的每一天,解析为时间list
val dateArray = lastDate.split("~")
DateUtils.findBetweenDates(dateArray(0), dateArray(1)).foreach(it => dateList.append(it))
}else if(lastDate.contains(",")){
// 如果是使用,分隔的多个日期,解析为时间list
lastDate.split(",").foreach(it => dateList.append(it))
}else{
// 添加进时间列表
dateList.append(lastDate)
}
// 循环同步每天的数据
dateList.foreach(it => loadData(spark, properties, it))
}
def main(args: Array[String]): Unit = {
job() {
val sparkAndProperties = SparkUtils.get()
val spark = sparkAndProperties.spark
val properties = sparkAndProperties.properties
// 调度任务
etl(spark,properties)
}
}
}