Spring Boot + Spring Data JPA + PostgreSQL

问题描述 投票:0回答:1

我在 Spring Boot 3 中有以下逻辑

控制器层:

```
@RestController
@RequestMapping("option")
public class OptionController {

    private final OptionService optionService;
    @Autowired
    public OptionController(OptionService optionService) {
        this.optionService = optionService;
    }

    @GetMapping("POCPostgreSQL")
    public ResponseEntity<String> POCSaveListOfAccumulatedPremiumsToPostgreSQL(){
        optionService.saveAllAccumulatedPremiums(new ArrayList<>());
        return ResponseEntity.ok(new String("POCPostgreSQL is done"));
    }
}
```

服务层:

```
public interface OptionService {
    void saveAllAccumulatedPremiums();
}


@Service
public class OptionServiceImpl implements OptionService{
    private final OptionPostgreSQLRepository optionPostgreSQLRepository;
    @Autowired
    public OptionServiceImpl(OptionPostgreSQLRepository optionPostgreSQLRepository) {
        this.optionPostgreSQLRepository = optionPostgreSQLRepository;
    }

    @Override
    public void saveAllAccumulatedPremiums() {
        POC_ETL_ALL_AccumulatedPremium();
    }

    public void POC_ETL_ALL_AccumulatedPremium(){
        Path startPath = Paths.get("/data");
        try (Stream<Path> folderPaths = Files.walk(startPath, 1)) {
            List<Path> sortedFolders = folderPaths
                    .filter(Files::isDirectory)  // only consider directories
                    .filter(path -> !path.equals(startPath))  // exclude the startPath itself
                    .sorted(Comparator.comparing(path -> path.getFileName().toString()))  // sort by folder name (which represents date)
                    .toList();

            for (Path folderPath : sortedFolders) {
                Instant folderBegin = Instant.now();
                System.out.println("Processing folder: " + folderPath);

                try (Stream<Path> paths = Files.walk(folderPath)) {
                    paths.filter(Files::isRegularFile)  // only consider files (ignore directories)
                            .filter(path -> path.toString().endsWith(".csv"))  // only process .csv files
                            .sorted(Comparator.comparing(path -> path.getFileName().toString()))  // sort by folder name (which represents timestamp)
                            .forEach(path -> {
                                try {
                                    List<AccumulatedPremium> premiums = parseCsvFile(path.toString(), AccumulatedPremium::fromCsvRecord);
                                    System.out.println(path);
                                    optionPostgreSQLRepository.saveAllAndFlush(premiums);


                                } catch (Exception e) {
                                    e.printStackTrace();
                                }
                            });
                } catch (IOException e) {
                    e.printStackTrace();
                }

                Instant folderFinish = Instant.now();
                long timeElapsed = Duration.between(folderBegin, folderFinish).toSeconds();
                System.out.println("Time taken for folder " + folderPath.getFileName() + ": " + timeElapsed + " seconds");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static <T> List<T> parseCsvFile(String filePath, Function<CSVRecord, T> recordMapper) {
        List<T> recordsList = new ArrayList<>();
        try (Reader reader = Files.newBufferedReader(Paths.get(filePath));
             CSVParser csvParser = new CSVParser(reader, CSVFormat.DEFAULT
                     .withFirstRecordAsHeader()
                     .withIgnoreHeaderCase()
                     .withTrim())) {

            for (CSVRecord csvRecord : csvParser) {
                T record = recordMapper.apply(csvRecord);
                recordsList.add(record);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
        return recordsList;
    }
}
```

DAO 层:

```
public interface OptionPostgreSQLRepository extends JpaRepository<AccumulatedPremium, Long> {
    
}
```

实体:

     @Data
    @AllArgsConstructor
    @NoArgsConstructor
    @Entity
    @Table(name = "accumulated_premium_temp")
    public class AccumulatedPremium {
        private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm");
        @Id
        @GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "accumulated_premium_temp")
        @SequenceGenerator(
            name = "accumulated_premium_temp", // Logical name used in the @GeneratedValue annotation to refer to this generator
            sequenceName = "accumulated_premium_temp_id_seq", // Actual name of the sequence in the database
            allocationSize = 1000 // The increment size for the sequence (change this as per your requirement)
        )
        private Long id;
    
        @Column(name = "local_date_time")
        private LocalDateTime localDateTime;
    
        @Column(name = "ticker")
        private String ticker;
    
        @Column(name = "strike_price")
        private double strikePrice;
    
        @Column(name = "expire_date")
        private Date expireDate;
    
        @Column(name = "premium")
        private double premium;
    
        public AccumulatedPremium(LocalDateTime localDateTime, String ticker, double strikePrice, Date expireDate, double premium) {
            this.localDateTime = localDateTime;
            this.ticker = ticker;
            this.strikePrice = strikePrice;
            this.expireDate = expireDate;
            this.premium = premium;
        }
    
        // Static method to map CSVRecord to AccumulatedPremium
        public static AccumulatedPremium fromCsvRecord(CSVRecord record) {
            LocalDateTime localDateTime = LocalDateTime.parse(record.get("TimeStamp"), dateTimeFormatter); // Use "TimeStamp" as it appears in the CSV
            String ticker = record.get("Ticker");
            double strikePrice = Double.parseDouble(record.get("StrikePrice"));
            Date expireDate = Date.valueOf(record.get("ExpirationDate"));
            double premium = Double.parseDouble(record.get("Premium"));
    
            return new AccumulatedPremium(
                localDateTime,
                ticker,
                strikePrice,
                expireDate,
                premium
            );
        }
    }

pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.2.2</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <properties>
        <java.version>21</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-mongodb</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-csv -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-csv</artifactId>
            <version>1.10.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.hibernate.orm.tooling</groupId>
                <artifactId>hibernate-enhance-maven-plugin</artifactId>
                <version>${hibernate.version}</version>
                <executions>
                    <execution>
                        <id>enhance</id>
                        <goals>
                            <goal>enhance</goal>
                        </goals>
                        <configuration>
                            <enableLazyInitialization>true</enableLazyInitialization>
                            <enableDirtyTracking>true</enableDirtyTracking>
                            <enableAssociationManagement>true</enableAssociationManagement>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.graalvm.buildtools</groupId>
                <artifactId>native-maven-plugin</artifactId>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

应用程序属性:

server.port=8091
# DataSource Configuration
spring.datasource.url=
spring.datasource.username=
spring.datasource.password=
# JPA/Hibernate Properties
spring.jpa.show-sql=false
spring.jpa.hibernate.ddl-auto=update
spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.PostgreSQLDialect
# Optional: If you want to view the batch operations in logs (turn off in production for better performance)
spring.jpa.properties.hibernate.format_sql=true
spring.jpa.properties.hibernate.generate_statistics=true
spring.jpa.properties.hibernate.jdbc.batch_size=1000
spring.jpa.properties.hibernate.order_inserts=true
spring.jpa.properties.hibernate.order_updates=true
# DataSource settings
spring.datasource.hikari.auto-commit=false
spring.datasource.hikari.maximum-pool-size=50

控制器层暴露的restful API是为ETL目的而设计的,用于将400 * 480 CSV文件加载到PostgreSQL单表中。当Spring Boot项目开始从CSV文件加载数据到PostgreSQL时,性能变得越来越慢。我已经检查了 Eclipse Memory Analyzer,当 ETL 工作负载工作时堆线性增长的原因是由于 hibernate 有状态实体上下文哈希图和标识图。正如您所看到的,我已经启用了批量插入和事务边界来刷新休眠缓存中的持久实体。但堆中的老一代由于上面提到的哈希图和恒等图仍然呈线性增长。更奇怪的是,如果我将所有相同的逻辑移到 SpringBoot 类中,如下所示:

@SpringBootApplication
public class SpringBootApplication implements CommandLineRunner {
    @Autowired
    OptionPostgreSQLRepository optionPostgreSQLRepository;
    @Autowired
    OptionService optionService;
    public static void main(String[] args) {
        SpringApplication.run(SpringBootApplication.class, args);
    }
    @Override
    public void run(String... args) throws Exception {
        optionService.saveAllAccumulatedPremiums();
    }
    public void POC_ETL_ALL_AccumulatedPremium(){
        Path startPath = Paths.get(/data");
        try (Stream<Path> folderPaths = Files.walk(startPath, 1)) {
            List<Path> sortedFolders = folderPaths
                    .filter(Files::isDirectory)  // only consider directories
                    .filter(path -> !path.equals(startPath))  // exclude the startPath itself
                    .sorted(Comparator.comparing(path -> path.getFileName().toString()))  // sort by folder name (which represents date)
                    .toList();

            for (Path folderPath : sortedFolders) {
                Instant folderBegin = Instant.now();
                System.out.println("Processing folder: " + folderPath);

                try (Stream<Path> paths = Files.walk(folderPath)) {
                    paths.filter(Files::isRegularFile)  // only consider files (ignore directories)
                            .filter(path -> path.toString().endsWith(".csv"))  // only process .csv files
                            .sorted(Comparator.comparing(path -> path.getFileName().toString()))  // sort by folder name (which represents timestamp)
                            .forEach(path -> {
                                try {
                                    List<AccumulatedPremium> premiums = parseCsvFile(path.toString(), AccumulatedPremium::fromCsvRecord);
                                    System.out.println(path);

                                    Instant postgreStart = Instant.now();
                                    optionPostgreSQLRepository.saveAll(premiums);
                                    Instant postgreFinish = Instant.now();
                                    long timeElapsed = Duration.between(postgreStart, postgreFinish).toMillis();
                                    System.out.println("Time taken for postgre " + ": " + timeElapsed + " milliseconds");



                                } catch (Exception e) {
                                    e.printStackTrace();
                                }
                            });
                } catch (IOException e) {
                    e.printStackTrace();
                }

                Instant folderFinish = Instant.now();
                long timeElapsed = Duration.between(folderBegin, folderFinish).toSeconds();
                System.out.println("Time taken for folder " + folderPath.getFileName() + ": " + timeElapsed + " seconds");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static <T> List<T> parseCsvFile(String filePath, Function<CSVRecord, T> recordMapper) {
        List<T> recordsList = new ArrayList<>();
        try (Reader reader = Files.newBufferedReader(Paths.get(filePath));
             CSVParser csvParser = new CSVParser(reader, CSVFormat.DEFAULT
                     .withFirstRecordAsHeader()
                     .withIgnoreHeaderCase()
                     .withTrim())) {

            for (CSVRecord csvRecord : csvParser) {
                T record = recordMapper.apply(csvRecord);
                recordsList.add(record);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
        return recordsList;
    }
}

性能变得越来越棒,堆使用量从未超过 100 MB。

比较奇怪的是,对于application.properties配置,我打开sql统计: spring.jpa.properties.hibernate.generate_statistics=true

但是只有当我在 springboot 类中运行逻辑时才会打印出统计数据。如果我通过在控制器中调用restful端点来运行它,则统计信息不会打印出来,并且由于identityHashMap和HashMap,堆在执行数小时后会线性增长到7GB。再次,我在内存分析器中看到,这是由于 hibernate 类引起的,我假设该类由 Spring Data JPA 使用,并且应通过 spring data jpa 中的 saveALlAndFlush API 中的刷新来清理。

任何人都可以给我一些建议或提示如何更深入地解决这个问题吗?

java spring-boot spring-data-jpa
1个回答
0
投票

看起来可能是由 Open Session In View 引起的。

您可以尝试添加吗

spring.jpa.open-in-view=false

并检查是否有帮助?

了解更多背景信息: Spring Boot 中的 spring.jpa.open-in-view=true 属性是什么?

© www.soinside.com 2019 - 2024. All rights reserved.