DataX数据同步无响应

出现原因:由于Starrocks设定了查询超时时间,DataX数据同步使用流式数据读取,导致数据读取超过了数据库指定的查询超时时间,数据读取被中断,DataX没有报错,出现了Speed一直为0的情况。

处理方法

  1. 可以暂时将数据库的query_timout参数调大,保证数据同步时间不会超过该值。

set global query_timeout=3000;
  1. 在当前SQL语句中设置query_timeout的值,详见:https://docs.starrocks.com/zh-cn/latest/reference/System_variable

SELECT /*+ SET_VAR(query_timeout = 1) */ name FROM people ORDER BY name;

具体说明:

  1. DataX的数据同步,采用的是使用java.sql.Statement从数据库拉取数据,并且将fetchSize设置成了Integer.MIN_VALUE, 该方式使用流数据接受方式,每次只从服务器接受部分数据,直到数据处理完毕。

源码如下:

/**
* 任务初始化
*/

public void init() {
    this.originalConfig = super.getPluginJobConf();

    Integer userConfigedFetchSize = this.originalConfig.getInt(Constant.FETCH_SIZE);
    if (userConfigedFetchSize != null) {
        LOG.warn("对 mysqlreader 不需要配置 fetchSize, mysqlreader 将会忽略这项配置. 如果您不想再看到此警告,请去除fetchSize 配置.");
    }
	// 默认被设置为Integer.MIN_VALUE
    this.originalConfig.set(Constant.FETCH_SIZE, Integer.MIN_VALUE);

    this.commonRdbmsReaderJob = new CommonRdbmsReader.Job(DATABASE_TYPE);
    this.commonRdbmsReaderJob.init(this.originalConfig);
}


/**
* 任务调用
**/
 public void startRead(RecordSender recordSender) {
    int fetchSize = this.readerSliceConfig.getInt(Constant.FETCH_SIZE);

    this.commonRdbmsReaderTask.startRead(this.readerSliceConfig, recordSender,
            super.getTaskPluginCollector(), fetchSize);
}


/**
* a wrapped method to execute select-like sql statement .
*
* @param conn         Database connection .
* @param sql          sql statement to be executed
* @param fetchSize
* @param queryTimeout unit:second
* @return
* @throws SQLException
*/
public static ResultSet query(Connection conn, String sql, int fetchSize, int queryTimeout)
    throws SQLException {
    // make sure autocommit is off
    conn.setAutoCommit(false);
    // ResultSet.RTYPE_FORWORD_ONLY,只可向前滚动;
    // ResultSet.CONCUR_READ_ONLY,指定不可以更新 ResultSet 
    Statement stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY,
                                          ResultSet.CONCUR_READ_ONLY);

    // 指定了fetchSize为Integer.MIN_VALUE
    stmt.setFetchSize(fetchSize);
    stmt.setQueryTimeout(queryTimeout);
    return query(stmt, sql);
}
  1. 数据库中配置了数据的查询超时时间,Starrocks中该配置名称为query_timeout。默认值为300s。如果一个查询持续时间超过了该参数的值,数据库就会返回查询超时错误。

  1. DataX未将该异常抛出,导致程序没有中止,实际数据库的查询已经结束,所有出现了Speed为0的现象。

  1. 代码调试,当超过query_timeout时,抛出如下错误。

  1. 测试用例如下:

测试表信息:

  • 字段数:34

  • 表数据量:12965900条

  • 表大小:9.074 GB

不同query_timeout测试结果如下:

query_timeout值
成功导出数据量

30s

77792

300s

778208

3000s

7821522

job_json如下:

使用SQL设置变量的job如下

Last updated

Was this helpful?