使用jdbcTemplet优化雪花批量插入数百万条记录

问题描述 投票:0回答:1

如何在使用 Spring Boot jdbcTemplet 时优化雪花数据库中数百万条记录的批量插入。

public Mono<Void> saveData(List<Map<String, Object>> dataMapList, String jdbcUrl, String username, String password, String targetObject) {
    return Mono.fromCallable(() -> {
        HikariDataSource dataSource = getOrCreateDataSource(jdbcUrl, username, password);
        try(Connection conn = dataSource.getConnection()){
            // Use Spring JdbcTemplate to perform the batch insert statement
            JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);

            // If the dataMapList is empty, do nothing
            if (dataMapList == null || dataMapList.isEmpty()) {
                return Mono.empty();
            }

            // Extract the columns from the first map in the list
            Map<String, Object> firstDataMap = dataMapList.get(0);
            List<String> columnNames = new ArrayList<>(firstDataMap.keySet());

            // Build the insert SQL statement based on the column names
            StringBuilder sqlBuilder = new StringBuilder("INSERT INTO " + dbSchema(jdbcUrl,targetObject) + "(");
            for (String columnName : columnNames) {
                sqlBuilder.append(columnName).append(", ");
            }
            sqlBuilder.delete(sqlBuilder.length() - 2, sqlBuilder.length()).append(") VALUES (");

            // Append placeholders for each column value
            for (int i = 0; i < columnNames.size(); i++) {
                sqlBuilder.append("?, ");
            }
            sqlBuilder.delete(sqlBuilder.length() - 2, sqlBuilder.length()).append(")");

            // Prepare the batch arguments
            List<Object[]> batchArgs = new ArrayList<>();
            for (Map<String, Object> data : dataMapList) {
                Object[] values = columnNames.stream().map(data::get).toArray();
                batchArgs.add(values);
            }

            // Execute the batch insert statement
            try {
                log.info("sqlBuilder.toString() : {}", sqlBuilder.toString());
                log.info("batchArgs rows: {}", batchArgs.size());
                jdbcTemplate.batchUpdate(sqlBuilder.toString(), batchArgs);
                log.info("completed insert rows size: {}", batchArgs.size());
                return Mono.empty(); // Return null to indicate success
            } catch (Exception e) {
                log.error("Error while inserting multi-row data : {}",e.getMessage());
                throw new RuntimeException(e);
            }
        }catch(Exception ex){
            log.error("Error while using JdbcTemplate: {}", ex.getMessage());
            throw new RuntimeException(ex);
        }

    }).subscribeOn(Schedulers.boundedElastic()).then();
}

此方法执行时间较长或失败且没有给出任何异常。如何优化这段代码以提高性能?这里的数据量很大。

spring spring-boot snowflake-cloud-data-platform query-optimization jdbctemplate
1个回答
0
投票

您可以使用

saveData
方法优化代码,通过动态创建 COPY INTO 语句并使用 Mono 异步执行操作来管理将提供的数据暂存在 Snowflake 的过程。

根据提供的数据和目标表创建

COPY INTO
语句。还可以设置
PreparedStatement
参数,以提高效率。

public Mono<Void> saveData(List<Map<String, Object>> dataMapList, String jdbcUrl, String username, String password, String targetObject) {
    return Mono.fromCallable(() -> {
        HikariDataSource dataSource = getOrCreateDataSource(jdbcUrl, username, password);
        try (Connection conn = dataSource.getConnection()) {
            if (dataMapList == null || dataMapList.isEmpty()) {
                return Mono.empty();
            }

            // snowflake using COPY INTO command
            String copyIntoStatement = buildCopyIntoStatement(targetObject, dataMapList);
            try (PreparedStatement pstmt = conn.prepareStatement(copyIntoStatement)) {
                IntStream.range(0, dataMapList.size())
                        .forEach(i -> setPreparedStatementParams(pstmt, dataMapList.get(i), i));
                pstmt.executeBatch();
            }

            return Mono.empty(); // success
        } catch (Exception ex) {
            throw new RuntimeException("Error while saving data: " + ex.getMessage(), ex);
        }
    }).subscribeOn(Schedulers.boundedElastic()).then();
}

private String buildCopyIntoStatement(String targetObject, List<Map<String, Object>> dataMapList) {
    Map<String, Object> firstDataMap = dataMapList.get(0);
    String columnNames = firstDataMap.keySet().stream()
            .collect(Collectors.joining(", "));

    String valuesPlaceholders = IntStream.range(0, firstDataMap.size())
            .mapToObj(i -> "?")
            .collect(Collectors.joining(", "));

    String values = IntStream.range(0, dataMapList.size())
            .mapToObj(i -> "(" + valuesPlaceholders + ")")
            .collect(Collectors.joining(", "));

    return "COPY INTO " + targetObject + " (" + columnNames + ") FROM VALUES " + values;
}

private void setPreparedStatementParams(PreparedStatement pstmt, Map<String, Object> data, int rowIndex) {
    int parameterIndex = rowIndex * data.size() + 1;
    data.values().forEach(value -> {
        try {
            pstmt.setObject(parameterIndex++, value);
        } catch (Exception ex) {
            throw new RuntimeException("Error while setting prepared statement parameter: " + ex.getMessage(), ex);
        }
    });
}
© www.soinside.com 2019 - 2024. All rights reserved.