如何从数据库导出大量结果集并导出多个csv文件并进行压缩?

问题描述 投票:0回答:1

我发现了一个非常不错的代码,可以将数据库中的大量行导出到多个csv文件中并将其压缩。我认为这是一个可以帮助很多开发人员的不错的代码。您可以在以下位置找到整个示例:https://github.com/idaamit/stream-from-db/tree/master控制器是:

@GetMapping(value = "/employees/{employeeId}/cars") @ResponseStatus(HttpStatus.OK) public ResponseEntity<StreamingResponseBody> getEmployeeCars(@PathVariable  int employeeId) {
    log.info("Going to export cars for employee {}", employeeId);
    String zipFileName = "Cars Of Employee - " + employeeId;
    return ResponseEntity.ok()
            .header(HttpHeaders.CONTENT_TYPE, "application/zip")
            .header(HttpHeaders.CONTENT_DISPOSITION, "attachment;filename=" + zipFileName + ".zip")
            .body(
                    employee.getCars(dataSource, employeeId));

员工优雅,首先我们检查是否需要准备多个csv:

public class Employee {

public StreamingResponseBody getCars(BasicDataSource dataSource, int employeeId) {
    StreamingResponseBody streamingResponseBody = new StreamingResponseBody() {
        @Override
        public void writeTo(OutputStream outputStream) throws IOException {
            JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
            String sqlQuery = "SELECT [Id], [employeeId],  [type], [text1] " +
                    "FROM Cars " +
                    "WHERE EmployeeID=? ";
            PreparedStatementSetter preparedStatementSetter = new PreparedStatementSetter() {
                public void setValues(PreparedStatement preparedStatement) throws SQLException {
                    preparedStatement.setInt(1, employeeId);
                }
            };
            StreamingZipResultSetExtractor zipExtractor = new StreamingZipResultSetExtractor(outputStream, employeeId, isMoreThanOneFile(jdbcTemplate, employeeId));
            Integer numberOfInteractionsSent = jdbcTemplate.query(sqlQuery, preparedStatementSetter, zipExtractor);

        }
    };
    return streamingResponseBody;
}

private boolean isMoreThanOneFile(JdbcTemplate jdbcTemplate, int employeeId) {
    Integer numberOfCars = getCount(jdbcTemplate, employeeId);
    return numberOfCars >= StreamingZipResultSetExtractor.MAX_ROWS_IN_CSV;
}

private Integer getCount(JdbcTemplate jdbcTemplate, int employeeId) {

    String sqlQuery = "SELECT count([Id]) " +
            "FROM Cars " +
            "WHERE EmployeeID=? ";

    return jdbcTemplate.queryForObject(sqlQuery, new Object[] { employeeId }, Integer.class);

}

}

此类StreamingZipResultSetExtractor负责将csv流数据拆分为多个文件并将其压缩。 @ slf4j 公共类StreamingZipResultSetExtractor实现ResultSetExtractor { 私有最终静态int CHUNK_SIZE = 100000; public final static int MAX_ROWS_IN_CSV = 10; 私有OutputStream outputStream; private int employeeId; 私有StreamingCsvResultSetExtractor StreamingCsvResultSetExtractor; 私有布尔值isInteractionCountExceedsLimit; private int fileCount = 0;

    public StreamingZipResultSetExtractor(OutputStream outputStream, int employeeId, boolean isInteractionCountExceedsLimit) {
        this.outputStream = outputStream;
        this.employeeId = employeeId;
        this.streamingCsvResultSetExtractor = new StreamingCsvResultSetExtractor(employeeId);
        this.isInteractionCountExceedsLimit = isInteractionCountExceedsLimit;
    }

    @Override
    @SneakyThrows
    public Integer extractData(ResultSet resultSet) throws DataAccessException {
        log.info("Creating thread to extract data as zip file for employeeId {}", employeeId);
        int lineCount = 1; //+1 for header row
        try (PipedOutputStream internalOutputStream = streamingCsvResultSetExtractor.extractData(resultSet);
             PipedInputStream InputStream = new PipedInputStream(internalOutputStream);
             BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(InputStream))) {

            String currentLine;
            String header = bufferedReader.readLine() + "\n";
            try (ZipOutputStream zipOutputStream = new ZipOutputStream(outputStream)) {
                createFile(employeeId, zipOutputStream, header);
                while ((currentLine = bufferedReader.readLine()) != null) {
                    if (lineCount % MAX_ROWS_IN_CSV == 0) {
                        zipOutputStream.closeEntry();
                        createFile(employeeId, zipOutputStream, header);
                        lineCount++;
                    }
                    lineCount++;
                    currentLine += "\n";
                    zipOutputStream.write(currentLine.getBytes());
                    if (lineCount % CHUNK_SIZE == 0) {
                        zipOutputStream.flush();
                    }
                }
            }
        } catch (IOException e) {
            log.error("Task {} could not zip search results", employeeId, e);
        }

        log.info("Finished zipping all lines to {} file\\s - total of {} lines of data for task {}", fileCount, lineCount - fileCount, employeeId);
        return lineCount;
    }

    private void createFile(int employeeId, ZipOutputStream zipOutputStream, String header) {
        String fileName = "Cars for Employee - " + employeeId;
        if (isInteractionCountExceedsLimit) {
            fileCount++;
            fileName += " Part " + fileCount;
        }
        try {
            zipOutputStream.putNextEntry(new ZipEntry(fileName + ".csv"));
            zipOutputStream.write(header.getBytes());
        } catch (IOException e) {
            log.error("Could not create new zip entry for task {} ", employeeId, e);
        }
    }

}

类StreamingCsvResultSetExtractor负责将数据从结果集中传输到csv文件中。还有更多工作要做,以处理在csv单元中有问题的特殊字符集。 @ slf4j 公共类StreamingCsvResultSetExtractor实现ResultSetExtractor { 私有最终静态int CHUNK_SIZE = 100000; 专用PipedOutputStream pipedOutputStream; 私有最终int employeeId; public StreamingCsvResultSetExtractor(int employeeId){ this.employeeId = employeeId; }

    @SneakyThrows
    @Override
    public PipedOutputStream extractData(ResultSet resultSet) throws DataAccessException {
        log.info("Creating thread to extract data as csv and save to file for task {}", employeeId);
        this.pipedOutputStream = new PipedOutputStream();
        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.submit(() -> {
            prepareCsv(resultSet);
        });

        return pipedOutputStream;
    }

    @SneakyThrows
    private Integer prepareCsv(ResultSet resultSet) {
        int interactionsSent = 1;
        log.info("starting to extract data to csv lines");
        streamHeaders(resultSet.getMetaData());
        StringBuilder csvRowBuilder = new StringBuilder();
        try {
            int columnCount = resultSet.getMetaData().getColumnCount();
            while (resultSet.next()) {
                for (int i = 1; i < columnCount + 1; i++) {
                    if(resultSet.getString(i) != null && resultSet.getString(i).contains(",")){
                            String strToAppend = "\"" + resultSet.getString(i) + "\"";
                            csvRowBuilder.append(strToAppend);
                        } else {
                            csvRowBuilder.append(resultSet.getString(i));
                    }
                    csvRowBuilder.append(",");
                }
                int rowLength = csvRowBuilder.length();
                csvRowBuilder.replace(rowLength - 1, rowLength, "\n");

                pipedOutputStream.write(csvRowBuilder.toString().getBytes());
                interactionsSent++;
                csvRowBuilder.setLength(0);
                if (interactionsSent % CHUNK_SIZE == 0) {
                    pipedOutputStream.flush();
                }
            }
        } finally {
            pipedOutputStream.flush();
            pipedOutputStream.close();
        }

        log.debug("Created all csv lines for Task {} - total of {} rows", employeeId, interactionsSent);
        return interactionsSent;
    }

    @SneakyThrows
    private void streamHeaders(ResultSetMetaData resultSetMetaData) {
        StringBuilder headersCsvBuilder = new StringBuilder();

        for (int i = 1; i < resultSetMetaData.getColumnCount() + 1; i++) {
            headersCsvBuilder.append(resultSetMetaData.getColumnLabel(i)).append(",");
        }
        int rowLength = headersCsvBuilder.length();
        headersCsvBuilder.replace(rowLength - 1, rowLength, "\n");

        pipedOutputStream.write(headersCsvBuilder.toString().getBytes());
    }

}
spring-boot csv stream zip jdbctemplate
1个回答
0
投票

您可以在以下位置找到整个示例:https://github.com/idaamit/stream-from-db/tree/master

© www.soinside.com 2019 - 2024. All rights reserved.