使用Spring批处理在数据库中导入CSV:RepositoryItemWriter没有使用JPA插入MySQL数据库

问题描述 投票:0回答:1

我有一个 CSV 文件,我必须将其导入到 mysql 数据库中。它的大小超过8Go。 为此,我开始研究 Spring Batch 来解决我的问题。我首先创建了一个 Spring Batch 应用程序:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.1.4</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<groupId>fr.test.batch</groupId>
<artifactId>mysql-import</artifactId>
<version>0.0.1-SNAPSHOT</version>

<name>mysql-import-batch</name>
<description>mysql-import-batch</description>

<properties>
    <java.version>17</java.version>
    <fasterxml.jackson.version>2.15.2</fasterxml.jackson.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-batch</artifactId>
    </dependency>
    <!-- Provides transitive vulnerable dependency maven:org.yaml:snakeyaml:1.33 -->
    <dependency>
        <groupId>org.yaml</groupId>
        <artifactId>snakeyaml</artifactId>
        <version>2.0</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
    <dependency>
        <groupId>com.mysql</groupId>
        <artifactId>mysql-connector-j</artifactId>
        <scope>runtime</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-core</artifactId>
        <version>${fasterxml.jackson.version}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-annotations -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-annotations</artifactId>
        <version>${fasterxml.jackson.version}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>${fasterxml.jackson.version}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.dataformat/jackson-dataformat-xml -->
    <dependency>
        <groupId>com.fasterxml.jackson.dataformat</groupId>
        <artifactId>jackson-dataformat-xml</artifactId>
        <version>${fasterxml.jackson.version}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-jsr310 -->
    <dependency>
        <groupId>com.fasterxml.jackson.datatype</groupId>
        <artifactId>jackson-datatype-jsr310</artifactId>
        <version>${fasterxml.jackson.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.batch</groupId>
        <artifactId>spring-batch-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
        </plugin>
    </plugins>
</build>

我的 CSV 文件有一整套列: Siren,nic,siret,statutDiffusionEtablissement,dateCreationEtablissement..

并且 CSV 不包含任何主键。我也在使用 JPA,为此我创建了一个实体:

@Entity
@Table(name="establishments")
@Data
@AllArgsConstructor @NoArgsConstructor
@JsonInclude(JsonInclude.Include.NON_NULL)
public class EstablishmentEntity {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @JsonProperty(value="siren")
    @JsonAlias("siren")
    private String siren;

    @JsonProperty(value="icn")
    @JsonAlias("nic")
    private String icn;

    @JsonProperty(value="siret")
    @JsonAlias("siret")
    private String siret;

   ...
}

我也有一个存储库:

@Repository
public interface EstablishmentRepository extends CrudRepository<EstablishmentEntity, Long> {
}

现在我尝试配置一个批处理类:

@Configuration
@EnableBatchProcessing
@EnableTransactionManagement
public class InseeBatchConfiguration {
    private static final Logger log = LoggerFactory.getLogger(InseeBatchConfiguration.class);

    //Reader class Object
    @Bean
    public FlatFileItemReader<EstablishmentEntity> reader() {
        log.warn("reader() called");
        return new FlatFileItemReaderBuilder<EstablishmentEntity>()
                .name("establishmentItemReader")
                .resource(new FileSystemResource("D:\\StockEtablissement_utf8.csv"))
                .delimited()
                .names("siren",
                        "nic",
                        "siret",
                        "statutDiffusionEtablissement",
                        "dateCreationEtablissement"...)
                .fieldSetMapper(new BeanWrapperFieldSetMapper<>() {{
                    setTargetType(EstablishmentEntity.class);
                }})
                .build();

    
    @Bean
    public EstablishmentItemProcessor processor() {
        log.warn("processor() called");
        return new EstablishmentItemProcessor();
    }

    //Writer class Object
    @Bean
    public RepositoryItemWriter<EstablishmentEntity> writer(EstablishmentRepository repository) {
        log.warn("writer() called");
        return new RepositoryItemWriterBuilder<EstablishmentEntity>()
                .repository(repository)
                .methodName("save")
                .build();
    }

    
    @Bean
    public Job importJob(
            JobRepository jobRepository,
            BatchListener listener,
            Step step1) {
        log.warn("importJob() called");
        return new JobBuilder("import", jobRepository)
                .incrementer(new RunIdIncrementer())
                .listener(listener)
                .flow(step1)
                .end()
                .build();
    }

    @Bean
    public Step step1(
            JobRepository jobRepository,
            PlatformTransactionManager transactionManager,
            RepositoryItemWriter<EstablishmentEntity> writer) {
        log.warn("step() called");
        return new StepBuilder("step1", jobRepository)
                .<EstablishmentEntity, EstablishmentEntity> chunk(10, transactionManager)
                .reader(reader())
                .processor(processor())
                .writer(writer)
                .build();
    }
}

主要方法如下:

public static void main(String[] args) {
     // SpringApplication.run(MysqlImportBatchApplication.class, args);
    System.exit(SpringApplication.exit(SpringApplication.run(MysqlImportBatchApplication.class, args)));
}

我有以下应用程序属性:

#mysql database connection
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url = jdbc:mysql://localhost:3306/insee?createDatabaseIfNotExist=true&allowPublicKeyRetrieval=true&useSSL=false
spring.datasource.username = root
spring.datasource.password = root
spring.jpa.generate-ddl=true

#disabled job run at startup

#----------ORM Details-------------------
#To display SQL At console
spring.jpa.show-sql=true
#To Create tables
spring.jpa.hibernate.ddl-auto=update
#To Generate SQL queries
spring.jpa.database-platform=org.hibernate.dialect.MySQL8Dialect
spring.jpa.properties.hibernate.dialect = org.hibernate.dialect.MySQL8Dialect


#----------Spring Batch Properties----------
# By default it's true which means all the Spring batches will start executing automatically
spring.batch.job.enabled=false
# Tables for metadata created by Spring Boot (Always, Embedded, Never)
spring.batch.jdbc.initialize-schema=ALWAYS

当我启动运行主方法的应用程序时,应用程序正确启动,如果不存在但没有写入数据,则创建表


根据 Mahmoud Ben Hassine 的回答,我更新了我的代码:

1。实现 JpaTransactionManager

 @Bean
 public JpaTransactionManager transactionManager() throws SQLException {
     JpaTransactionManager transactionManager = new JpaTransactionManager();
     transactionManager.setEntityManagerFactory(entityManagerFactoryBean().getObject());
     transactionManager.setDataSource(entityManagerFactoryBean().getDataSource());
     return transactionManager;
 }
 
 @Bean(name = "entityManagerFactory")
 public LocalContainerEntityManagerFactoryBean entityManagerFactoryBean() throws SQLException {
     LocalContainerEntityManagerFactoryBean entityManagerFactoryBean = new LocalContainerEntityManagerFactoryBean();
     entityManagerFactoryBean.setJpaVendorAdapter(vendorAdaptor());
     entityManagerFactoryBean.setDataSource(datasource());
     entityManagerFactoryBean.setPersistenceProviderClass(HibernatePersistenceProvider.class);
     entityManagerFactoryBean.setPackagesToScan(ENTITYMANAGER_PACKAGES_TO_SCAN);
     entityManagerFactoryBean.setJpaProperties(jpaHibernateProperties());
     entityManagerFactoryBean.setJpaDialect(new HibernateJpaDialect());
     return entityManagerFactoryBean;
 }
 
 private HibernateJpaVendorAdapter vendorAdaptor() {
     HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter();
     vendorAdapter.setShowSql(true);
     return vendorAdapter;
 }
 
 @Bean(name = "dataSource")
 @ConfigurationProperties(prefix = "spring.datasource")
 public DataSource datasource() {
     HikariConfig config = new HikariConfig();
     config.setJdbcUrl(env.getProperty("spring.datasource.url"));
     config.setUsername(env.getProperty("spring.datasource.username"));
     ...
  config.setPassword(env.getProperty("spring.datasource.password"));
     config.setMinimumIdle(Integer.parseInt(
     Properties props = new Properties();
     props.put(
             "spring.datasource.hikari.data-source-properties.cachePrepStmts",
             env.getProperty("spring.datasource.hikari.data-source-properties.cachePrepStmts"));
     ...  
     config.setDataSourceProperties(props);
 
     log.warn("## datasource() called");
     return new HikariDataSource(config);
 }
 
 private Properties jpaHibernateProperties() {
     Properties properties = new Properties();
     ...
     properties.put(PROPERTY_NAME_HIBERNATE_SHOW_SQL, env.getProperty(PROPERTY_NAME_HIBERNATE_SHOW_SQL));
     properties.put(PROPERTY_NAME_HIBERNATE_DIALECT, env.getProperty(PROPERTY_NAME_HIBERNATE_DIALECT));
     //properties.put(AvailableSettings.JAKARTA_HBM2DDL_DATABASE_ACTION, "none");
     return properties;
 }

2。我也更新了我的step1。

@Bean
public Step step1(
        BatchReadListener listener,
        JobRepository jobRepository,
        JpaTransactionManager transactionManager,
        RepositoryItemWriter<EstablishmentEntity> writer) throws IOException {
    log.warn("step() called");
    return new StepBuilder("step1", jobRepository)
            .<EstablishmentEntity, EstablishmentEntity> chunk(10, transactionManager)
            .reader(reader())
            .processor(processor())
            .writer(writer)
            .listener(listener)
            .build();
}

我还添加了一个监听器用于测试目的。

@Service
public class BatchReadListener implements ItemReadListener<EstablishmentEntity> {
    @Override
    public void beforeRead() {
        // ItemReadListener.super.beforeRead();
        System.out.println("Before reading ...");
    }

    @Override
    public void afterRead(EstablishmentEntity item) {
        System.out.println("After reading ...");
        System.out.println("## " +  item.getSiret());

    }

    @Override
    public void onReadError(Exception ex) {
        ex.printStackTrace();
        // ItemReadListener.super.onReadError(ex);
    }
}

启动应用程序时,数据仍未存储在数据库中。我确实有有关数据库连接的日志:

HikariPool-1 - Fill pool skipped, pool has sufficient level or currently being filled (queueDepth=0).
HikariPool-1 - Pool stats (total=10, active=0, idle=10, waiting=0)
HikariPool-1 - Fill pool skipped, pool has sufficient level or currently being filled (queueDepth=0).
HikariPool-1 - Pool stats (total=10, active=0, idle=10, waiting=0)
HikariPool-1 - Fill pool skipped, pool has sufficient level or currently being filled (queueDepth=0).
HikariPool-1 - Closing connection com.mysql.cj.jdbc.ConnectionImpl@61c9d64b: (connection has passed maxLifetime)
spring jpa spring-batch
1个回答
1
投票

RepositoryItemWriter
基于JPA存储库,因此您需要确保步骤中自动装配的
PlatformTransactionManager transactionManager
JpaTransactionManager
类型,而不是
DataSourceTransactionManager
JdbcTransactionManager
(这似乎是默认类型)由 Spring Boot 自动配置)。

您可以将步骤签名更改为:

@Bean
public Step step1(
   JobRepository jobRepository,
   JpaTransactionManager transactionManager,
   RepositoryItemWriter<EstablishmentEntity> writer) {
 ...
}

这将使错误变得明显,即如果上下文中没有定义

JpaTransactionManager
bean,应用程序将无法启动。

© www.soinside.com 2019 - 2024. All rights reserved.