如何在使用 Spring Boot jdbcTemplet 时优化雪花数据库中数百万条记录的批量插入。
public Mono<Void> saveData(List<Map<String, Object>> dataMapList, String jdbcUrl, String username, String password, String targetObject) {
return Mono.fromCallable(() -> {
HikariDataSource dataSource = getOrCreateDataSource(jdbcUrl, username, password);
try(Connection conn = dataSource.getConnection()){
// Use Spring JdbcTemplate to perform the batch insert statement
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
// If the dataMapList is empty, do nothing
if (dataMapList == null || dataMapList.isEmpty()) {
return Mono.empty();
}
// Extract the columns from the first map in the list
Map<String, Object> firstDataMap = dataMapList.get(0);
List<String> columnNames = new ArrayList<>(firstDataMap.keySet());
// Build the insert SQL statement based on the column names
StringBuilder sqlBuilder = new StringBuilder("INSERT INTO " + dbSchema(jdbcUrl,targetObject) + "(");
for (String columnName : columnNames) {
sqlBuilder.append(columnName).append(", ");
}
sqlBuilder.delete(sqlBuilder.length() - 2, sqlBuilder.length()).append(") VALUES (");
// Append placeholders for each column value
for (int i = 0; i < columnNames.size(); i++) {
sqlBuilder.append("?, ");
}
sqlBuilder.delete(sqlBuilder.length() - 2, sqlBuilder.length()).append(")");
// Prepare the batch arguments
List<Object[]> batchArgs = new ArrayList<>();
for (Map<String, Object> data : dataMapList) {
Object[] values = columnNames.stream().map(data::get).toArray();
batchArgs.add(values);
}
// Execute the batch insert statement
try {
log.info("sqlBuilder.toString() : {}", sqlBuilder.toString());
log.info("batchArgs rows: {}", batchArgs.size());
jdbcTemplate.batchUpdate(sqlBuilder.toString(), batchArgs);
log.info("completed insert rows size: {}", batchArgs.size());
return Mono.empty(); // Return null to indicate success
} catch (Exception e) {
log.error("Error while inserting multi-row data : {}",e.getMessage());
throw new RuntimeException(e);
}
}catch(Exception ex){
log.error("Error while using JdbcTemplate: {}", ex.getMessage());
throw new RuntimeException(ex);
}
}).subscribeOn(Schedulers.boundedElastic()).then();
}
此方法执行时间较长或失败且没有给出任何异常。如何优化这段代码以提高性能?这里的数据量很大。
您可以使用
saveData
方法优化代码,通过动态创建 COPY INTO 语句并使用 Mono 异步执行操作来管理将提供的数据暂存在 Snowflake 的过程。
根据提供的数据和目标表创建
COPY INTO
语句。还可以设置PreparedStatement
参数,以提高效率。
public Mono<Void> saveData(List<Map<String, Object>> dataMapList, String jdbcUrl, String username, String password, String targetObject) {
return Mono.fromCallable(() -> {
HikariDataSource dataSource = getOrCreateDataSource(jdbcUrl, username, password);
try (Connection conn = dataSource.getConnection()) {
if (dataMapList == null || dataMapList.isEmpty()) {
return Mono.empty();
}
// snowflake using COPY INTO command
String copyIntoStatement = buildCopyIntoStatement(targetObject, dataMapList);
try (PreparedStatement pstmt = conn.prepareStatement(copyIntoStatement)) {
IntStream.range(0, dataMapList.size())
.forEach(i -> setPreparedStatementParams(pstmt, dataMapList.get(i), i));
pstmt.executeBatch();
}
return Mono.empty(); // success
} catch (Exception ex) {
throw new RuntimeException("Error while saving data: " + ex.getMessage(), ex);
}
}).subscribeOn(Schedulers.boundedElastic()).then();
}
private String buildCopyIntoStatement(String targetObject, List<Map<String, Object>> dataMapList) {
Map<String, Object> firstDataMap = dataMapList.get(0);
String columnNames = firstDataMap.keySet().stream()
.collect(Collectors.joining(", "));
String valuesPlaceholders = IntStream.range(0, firstDataMap.size())
.mapToObj(i -> "?")
.collect(Collectors.joining(", "));
String values = IntStream.range(0, dataMapList.size())
.mapToObj(i -> "(" + valuesPlaceholders + ")")
.collect(Collectors.joining(", "));
return "COPY INTO " + targetObject + " (" + columnNames + ") FROM VALUES " + values;
}
private void setPreparedStatementParams(PreparedStatement pstmt, Map<String, Object> data, int rowIndex) {
int parameterIndex = rowIndex * data.size() + 1;
data.values().forEach(value -> {
try {
pstmt.setObject(parameterIndex++, value);
} catch (Exception ex) {
throw new RuntimeException("Error while setting prepared statement parameter: " + ex.getMessage(), ex);
}
});
}