我有一个 CSV 文件,我必须将其导入到 mysql 数据库中。它的大小超过8Go。 为此,我开始研究 Spring Batch 来解决我的问题。我首先创建了一个 Spring Batch 应用程序:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.4</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>fr.test.batch</groupId>
<artifactId>mysql-import</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>mysql-import-batch</name>
<description>mysql-import-batch</description>
<properties>
<java.version>17</java.version>
<fasterxml.jackson.version>2.15.2</fasterxml.jackson.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<!-- Provides transitive vulnerable dependency maven:org.yaml:snakeyaml:1.33 -->
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>2.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${fasterxml.jackson.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-annotations -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${fasterxml.jackson.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${fasterxml.jackson.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.dataformat/jackson-dataformat-xml -->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>${fasterxml.jackson.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-jsr310 -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>${fasterxml.jackson.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
我的 CSV 文件有一整套列: Siren,nic,siret,statutDiffusionEtablissement,dateCreationEtablissement..
并且 CSV 不包含任何主键。我也在使用 JPA,为此我创建了一个实体:
@Entity
@Table(name="establishments")
@Data
@AllArgsConstructor @NoArgsConstructor
@JsonInclude(JsonInclude.Include.NON_NULL)
public class EstablishmentEntity {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@JsonProperty(value="siren")
@JsonAlias("siren")
private String siren;
@JsonProperty(value="icn")
@JsonAlias("nic")
private String icn;
@JsonProperty(value="siret")
@JsonAlias("siret")
private String siret;
...
}
我也有一个存储库:
@Repository
public interface EstablishmentRepository extends CrudRepository<EstablishmentEntity, Long> {
}
现在我尝试配置一个批处理类:
@Configuration
@EnableBatchProcessing
@EnableTransactionManagement
public class InseeBatchConfiguration {
private static final Logger log = LoggerFactory.getLogger(InseeBatchConfiguration.class);
//Reader class Object
@Bean
public FlatFileItemReader<EstablishmentEntity> reader() {
log.warn("reader() called");
return new FlatFileItemReaderBuilder<EstablishmentEntity>()
.name("establishmentItemReader")
.resource(new FileSystemResource("D:\\StockEtablissement_utf8.csv"))
.delimited()
.names("siren",
"nic",
"siret",
"statutDiffusionEtablissement",
"dateCreationEtablissement"...)
.fieldSetMapper(new BeanWrapperFieldSetMapper<>() {{
setTargetType(EstablishmentEntity.class);
}})
.build();
@Bean
public EstablishmentItemProcessor processor() {
log.warn("processor() called");
return new EstablishmentItemProcessor();
}
//Writer class Object
@Bean
public RepositoryItemWriter<EstablishmentEntity> writer(EstablishmentRepository repository) {
log.warn("writer() called");
return new RepositoryItemWriterBuilder<EstablishmentEntity>()
.repository(repository)
.methodName("save")
.build();
}
@Bean
public Job importJob(
JobRepository jobRepository,
BatchListener listener,
Step step1) {
log.warn("importJob() called");
return new JobBuilder("import", jobRepository)
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(step1)
.end()
.build();
}
@Bean
public Step step1(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
RepositoryItemWriter<EstablishmentEntity> writer) {
log.warn("step() called");
return new StepBuilder("step1", jobRepository)
.<EstablishmentEntity, EstablishmentEntity> chunk(10, transactionManager)
.reader(reader())
.processor(processor())
.writer(writer)
.build();
}
}
主要方法如下:
public static void main(String[] args) {
// SpringApplication.run(MysqlImportBatchApplication.class, args);
System.exit(SpringApplication.exit(SpringApplication.run(MysqlImportBatchApplication.class, args)));
}
我有以下应用程序属性:
#mysql database connection
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url = jdbc:mysql://localhost:3306/insee?createDatabaseIfNotExist=true&allowPublicKeyRetrieval=true&useSSL=false
spring.datasource.username = root
spring.datasource.password = root
spring.jpa.generate-ddl=true
#disabled job run at startup
#----------ORM Details-------------------
#To display SQL At console
spring.jpa.show-sql=true
#To Create tables
spring.jpa.hibernate.ddl-auto=update
#To Generate SQL queries
spring.jpa.database-platform=org.hibernate.dialect.MySQL8Dialect
spring.jpa.properties.hibernate.dialect = org.hibernate.dialect.MySQL8Dialect
#----------Spring Batch Properties----------
# By default it's true which means all the Spring batches will start executing automatically
spring.batch.job.enabled=false
# Tables for metadata created by Spring Boot (Always, Embedded, Never)
spring.batch.jdbc.initialize-schema=ALWAYS
当我启动运行主方法的应用程序时,应用程序正确启动,如果不存在但没有写入数据,则创建表
根据 Mahmoud Ben Hassine 的回答,我更新了我的代码:
1。实现 JpaTransactionManager
@Bean
public JpaTransactionManager transactionManager() throws SQLException {
JpaTransactionManager transactionManager = new JpaTransactionManager();
transactionManager.setEntityManagerFactory(entityManagerFactoryBean().getObject());
transactionManager.setDataSource(entityManagerFactoryBean().getDataSource());
return transactionManager;
}
@Bean(name = "entityManagerFactory")
public LocalContainerEntityManagerFactoryBean entityManagerFactoryBean() throws SQLException {
LocalContainerEntityManagerFactoryBean entityManagerFactoryBean = new LocalContainerEntityManagerFactoryBean();
entityManagerFactoryBean.setJpaVendorAdapter(vendorAdaptor());
entityManagerFactoryBean.setDataSource(datasource());
entityManagerFactoryBean.setPersistenceProviderClass(HibernatePersistenceProvider.class);
entityManagerFactoryBean.setPackagesToScan(ENTITYMANAGER_PACKAGES_TO_SCAN);
entityManagerFactoryBean.setJpaProperties(jpaHibernateProperties());
entityManagerFactoryBean.setJpaDialect(new HibernateJpaDialect());
return entityManagerFactoryBean;
}
private HibernateJpaVendorAdapter vendorAdaptor() {
HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter();
vendorAdapter.setShowSql(true);
return vendorAdapter;
}
@Bean(name = "dataSource")
@ConfigurationProperties(prefix = "spring.datasource")
public DataSource datasource() {
HikariConfig config = new HikariConfig();
config.setJdbcUrl(env.getProperty("spring.datasource.url"));
config.setUsername(env.getProperty("spring.datasource.username"));
...
config.setPassword(env.getProperty("spring.datasource.password"));
config.setMinimumIdle(Integer.parseInt(
Properties props = new Properties();
props.put(
"spring.datasource.hikari.data-source-properties.cachePrepStmts",
env.getProperty("spring.datasource.hikari.data-source-properties.cachePrepStmts"));
...
config.setDataSourceProperties(props);
log.warn("## datasource() called");
return new HikariDataSource(config);
}
private Properties jpaHibernateProperties() {
Properties properties = new Properties();
...
properties.put(PROPERTY_NAME_HIBERNATE_SHOW_SQL, env.getProperty(PROPERTY_NAME_HIBERNATE_SHOW_SQL));
properties.put(PROPERTY_NAME_HIBERNATE_DIALECT, env.getProperty(PROPERTY_NAME_HIBERNATE_DIALECT));
//properties.put(AvailableSettings.JAKARTA_HBM2DDL_DATABASE_ACTION, "none");
return properties;
}
2。我也更新了我的step1。
@Bean
public Step step1(
BatchReadListener listener,
JobRepository jobRepository,
JpaTransactionManager transactionManager,
RepositoryItemWriter<EstablishmentEntity> writer) throws IOException {
log.warn("step() called");
return new StepBuilder("step1", jobRepository)
.<EstablishmentEntity, EstablishmentEntity> chunk(10, transactionManager)
.reader(reader())
.processor(processor())
.writer(writer)
.listener(listener)
.build();
}
我还添加了一个监听器用于测试目的。
@Service
public class BatchReadListener implements ItemReadListener<EstablishmentEntity> {
@Override
public void beforeRead() {
// ItemReadListener.super.beforeRead();
System.out.println("Before reading ...");
}
@Override
public void afterRead(EstablishmentEntity item) {
System.out.println("After reading ...");
System.out.println("## " + item.getSiret());
}
@Override
public void onReadError(Exception ex) {
ex.printStackTrace();
// ItemReadListener.super.onReadError(ex);
}
}
启动应用程序时,数据仍未存储在数据库中。我确实有有关数据库连接的日志:
HikariPool-1 - Fill pool skipped, pool has sufficient level or currently being filled (queueDepth=0).
HikariPool-1 - Pool stats (total=10, active=0, idle=10, waiting=0)
HikariPool-1 - Fill pool skipped, pool has sufficient level or currently being filled (queueDepth=0).
HikariPool-1 - Pool stats (total=10, active=0, idle=10, waiting=0)
HikariPool-1 - Fill pool skipped, pool has sufficient level or currently being filled (queueDepth=0).
HikariPool-1 - Closing connection com.mysql.cj.jdbc.ConnectionImpl@61c9d64b: (connection has passed maxLifetime)
RepositoryItemWriter
基于JPA存储库,因此您需要确保步骤中自动装配的PlatformTransactionManager transactionManager
是JpaTransactionManager
类型,而不是DataSourceTransactionManager
或JdbcTransactionManager
(这似乎是默认类型)由 Spring Boot 自动配置)。
您可以将步骤签名更改为:
@Bean
public Step step1(
JobRepository jobRepository,
JpaTransactionManager transactionManager,
RepositoryItemWriter<EstablishmentEntity> writer) {
...
}
这将使错误变得明显,即如果上下文中没有定义
JpaTransactionManager
bean,应用程序将无法启动。