我发现了一个非常不错的代码,可以将数据库中的大量行导出到多个csv文件中并将其压缩。我认为这是一个可以帮助很多开发人员的不错的代码。您可以在以下位置找到整个示例:https://github.com/idaamit/stream-from-db/tree/master控制器是:
@GetMapping(value = "/employees/{employeeId}/cars") @ResponseStatus(HttpStatus.OK) public ResponseEntity<StreamingResponseBody> getEmployeeCars(@PathVariable int employeeId) {
log.info("Going to export cars for employee {}", employeeId);
String zipFileName = "Cars Of Employee - " + employeeId;
return ResponseEntity.ok()
.header(HttpHeaders.CONTENT_TYPE, "application/zip")
.header(HttpHeaders.CONTENT_DISPOSITION, "attachment;filename=" + zipFileName + ".zip")
.body(
employee.getCars(dataSource, employeeId));
员工优雅,首先我们检查是否需要准备多个csv:
public class Employee {
public StreamingResponseBody getCars(BasicDataSource dataSource, int employeeId) {
StreamingResponseBody streamingResponseBody = new StreamingResponseBody() {
@Override
public void writeTo(OutputStream outputStream) throws IOException {
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
String sqlQuery = "SELECT [Id], [employeeId], [type], [text1] " +
"FROM Cars " +
"WHERE EmployeeID=? ";
PreparedStatementSetter preparedStatementSetter = new PreparedStatementSetter() {
public void setValues(PreparedStatement preparedStatement) throws SQLException {
preparedStatement.setInt(1, employeeId);
}
};
StreamingZipResultSetExtractor zipExtractor = new StreamingZipResultSetExtractor(outputStream, employeeId, isMoreThanOneFile(jdbcTemplate, employeeId));
Integer numberOfInteractionsSent = jdbcTemplate.query(sqlQuery, preparedStatementSetter, zipExtractor);
}
};
return streamingResponseBody;
}
private boolean isMoreThanOneFile(JdbcTemplate jdbcTemplate, int employeeId) {
Integer numberOfCars = getCount(jdbcTemplate, employeeId);
return numberOfCars >= StreamingZipResultSetExtractor.MAX_ROWS_IN_CSV;
}
private Integer getCount(JdbcTemplate jdbcTemplate, int employeeId) {
String sqlQuery = "SELECT count([Id]) " +
"FROM Cars " +
"WHERE EmployeeID=? ";
return jdbcTemplate.queryForObject(sqlQuery, new Object[] { employeeId }, Integer.class);
}
}
此类StreamingZipResultSetExtractor负责将csv流数据拆分为多个文件并将其压缩。 @ slf4j 公共类StreamingZipResultSetExtractor实现ResultSetExtractor { 私有最终静态int CHUNK_SIZE = 100000; public final static int MAX_ROWS_IN_CSV = 10; 私有OutputStream outputStream; private int employeeId; 私有StreamingCsvResultSetExtractor StreamingCsvResultSetExtractor; 私有布尔值isInteractionCountExceedsLimit; private int fileCount = 0;
public StreamingZipResultSetExtractor(OutputStream outputStream, int employeeId, boolean isInteractionCountExceedsLimit) {
this.outputStream = outputStream;
this.employeeId = employeeId;
this.streamingCsvResultSetExtractor = new StreamingCsvResultSetExtractor(employeeId);
this.isInteractionCountExceedsLimit = isInteractionCountExceedsLimit;
}
@Override
@SneakyThrows
public Integer extractData(ResultSet resultSet) throws DataAccessException {
log.info("Creating thread to extract data as zip file for employeeId {}", employeeId);
int lineCount = 1; //+1 for header row
try (PipedOutputStream internalOutputStream = streamingCsvResultSetExtractor.extractData(resultSet);
PipedInputStream InputStream = new PipedInputStream(internalOutputStream);
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(InputStream))) {
String currentLine;
String header = bufferedReader.readLine() + "\n";
try (ZipOutputStream zipOutputStream = new ZipOutputStream(outputStream)) {
createFile(employeeId, zipOutputStream, header);
while ((currentLine = bufferedReader.readLine()) != null) {
if (lineCount % MAX_ROWS_IN_CSV == 0) {
zipOutputStream.closeEntry();
createFile(employeeId, zipOutputStream, header);
lineCount++;
}
lineCount++;
currentLine += "\n";
zipOutputStream.write(currentLine.getBytes());
if (lineCount % CHUNK_SIZE == 0) {
zipOutputStream.flush();
}
}
}
} catch (IOException e) {
log.error("Task {} could not zip search results", employeeId, e);
}
log.info("Finished zipping all lines to {} file\\s - total of {} lines of data for task {}", fileCount, lineCount - fileCount, employeeId);
return lineCount;
}
private void createFile(int employeeId, ZipOutputStream zipOutputStream, String header) {
String fileName = "Cars for Employee - " + employeeId;
if (isInteractionCountExceedsLimit) {
fileCount++;
fileName += " Part " + fileCount;
}
try {
zipOutputStream.putNextEntry(new ZipEntry(fileName + ".csv"));
zipOutputStream.write(header.getBytes());
} catch (IOException e) {
log.error("Could not create new zip entry for task {} ", employeeId, e);
}
}
}
类StreamingCsvResultSetExtractor负责将数据从结果集中传输到csv文件中。还有更多工作要做,以处理在csv单元中有问题的特殊字符集。 @ slf4j 公共类StreamingCsvResultSetExtractor实现ResultSetExtractor { 私有最终静态int CHUNK_SIZE = 100000; 专用PipedOutputStream pipedOutputStream; 私有最终int employeeId; public StreamingCsvResultSetExtractor(int employeeId){ this.employeeId = employeeId; }
@SneakyThrows
@Override
public PipedOutputStream extractData(ResultSet resultSet) throws DataAccessException {
log.info("Creating thread to extract data as csv and save to file for task {}", employeeId);
this.pipedOutputStream = new PipedOutputStream();
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
prepareCsv(resultSet);
});
return pipedOutputStream;
}
@SneakyThrows
private Integer prepareCsv(ResultSet resultSet) {
int interactionsSent = 1;
log.info("starting to extract data to csv lines");
streamHeaders(resultSet.getMetaData());
StringBuilder csvRowBuilder = new StringBuilder();
try {
int columnCount = resultSet.getMetaData().getColumnCount();
while (resultSet.next()) {
for (int i = 1; i < columnCount + 1; i++) {
if(resultSet.getString(i) != null && resultSet.getString(i).contains(",")){
String strToAppend = "\"" + resultSet.getString(i) + "\"";
csvRowBuilder.append(strToAppend);
} else {
csvRowBuilder.append(resultSet.getString(i));
}
csvRowBuilder.append(",");
}
int rowLength = csvRowBuilder.length();
csvRowBuilder.replace(rowLength - 1, rowLength, "\n");
pipedOutputStream.write(csvRowBuilder.toString().getBytes());
interactionsSent++;
csvRowBuilder.setLength(0);
if (interactionsSent % CHUNK_SIZE == 0) {
pipedOutputStream.flush();
}
}
} finally {
pipedOutputStream.flush();
pipedOutputStream.close();
}
log.debug("Created all csv lines for Task {} - total of {} rows", employeeId, interactionsSent);
return interactionsSent;
}
@SneakyThrows
private void streamHeaders(ResultSetMetaData resultSetMetaData) {
StringBuilder headersCsvBuilder = new StringBuilder();
for (int i = 1; i < resultSetMetaData.getColumnCount() + 1; i++) {
headersCsvBuilder.append(resultSetMetaData.getColumnLabel(i)).append(",");
}
int rowLength = headersCsvBuilder.length();
headersCsvBuilder.replace(rowLength - 1, rowLength, "\n");
pipedOutputStream.write(headersCsvBuilder.toString().getBytes());
}
}
您可以在以下位置找到整个示例:https://github.com/idaamit/stream-from-db/tree/master