我定义了一个简单的作业,应该从数据库读取记录,处理项目并更新/写入已成功处理的项目的状态列。
写入器被卡住(当尝试写入块时提交事务时),因为 itemReader 已经锁定了表。 ItemReader 输入由此查询定义:
SELECT ... FOR UPDATE SKIP LOCKED LIMIT 500
当 ItemReader 锁定行时,如何使 ItemWriter 能够将项目写入数据库?
我尝试将自定义
PlatformTransactionManager
bean 定义为
@Bean
public PlatformTransactionManager transactionManager(final DataSource dataSource) {
return new JdbcTransactionManager(dataSource);
}
这个和
useSharedExtendedConnection
的JdbcCursorItemReader
参数有关。默认情况下,游标将在其连接中使用打开,并且不会参与为其余步骤处理启动的任何事务。因此,您面临的问题。
您需要将该参数设置为
true
并将数据源包装在 Spring Batch 提供的 ExtendedConnectionDataSourceProxy
中(如 Javadoc 中所述)。
这里是一个完整的示例(基于 Spring Batch 5.0.3 和 TestContainers,您可以添加here进行快速测试):
/*
* Copyright 2020-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.core.test.repository;
import java.util.Objects;
import javax.sql.DataSource;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.postgresql.ds.PGSimpleDataSource;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.ExtendedConnectionDataSourceProxy;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.jdbc.core.DataClassRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator;
import org.springframework.jdbc.support.JdbcTransactionManager;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import static org.junit.jupiter.api.Assertions.assertEquals;
/**
* @author Mahmoud Ben Hassine
*/
@Testcontainers(disabledWithoutDocker = true)
@SpringJUnitConfig
class PostgreSQLJobRepositoryIntegrationTests {
private static final DockerImageName POSTGRESQL_IMAGE = DockerImageName.parse("postgres:13.3");
@Container
public static PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>(POSTGRESQL_IMAGE);
@Autowired
private DataSource dataSource;
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job job;
@BeforeEach
void setUp() {
ResourceDatabasePopulator databasePopulator = new ResourceDatabasePopulator();
databasePopulator.addScript(new ClassPathResource("/org/springframework/batch/core/schema-postgresql.sql"));
databasePopulator.execute(this.dataSource);
jdbcTemplate.execute("create table person (id bigint not null primary key, name varchar not null, processed boolean)");
jdbcTemplate.execute("insert into person (id, name, processed) values (1, 'foo1', false)");
jdbcTemplate.execute("insert into person (id, name, processed) values (2, 'FOO2', true)");
jdbcTemplate.execute("insert into person (id, name, processed) values (3, 'bar1', false)");
jdbcTemplate.execute("insert into person (id, name, processed) values (4, 'BAR2', true)");
}
@Test
void testJobExecution() throws Exception {
// given
JobParameters jobParameters = new JobParametersBuilder().toJobParameters();
// when
JobExecution jobExecution = this.jobLauncher.run(this.job, jobParameters);
// then
assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus());
jdbcTemplate
.query("select * from person", new DataClassRowMapper<>(Person.class))
.forEach(person -> System.out.println("person = " + person));
}
@Configuration
@EnableBatchProcessing
static class TestConfiguration {
@Bean
public DataSource dataSource() throws Exception {
PGSimpleDataSource datasource = new PGSimpleDataSource();
datasource.setURL(postgres.getJdbcUrl());
datasource.setUser(postgres.getUsername());
datasource.setPassword(postgres.getPassword());
return new ExtendedConnectionDataSourceProxy(datasource);
}
@Bean
public JdbcTransactionManager transactionManager(DataSource dataSource) {
return new JdbcTransactionManager(dataSource);
}
@Bean
public JdbcTemplate jdbcTemplate(DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
@Bean
public JdbcCursorItemReader<Person> itemReader(DataSource dataSource) {
String sql = "select * from person where processed = false for update skip locked limit 2";
return new JdbcCursorItemReaderBuilder<Person>()
.name("personItemReader")
.dataSource(dataSource)
.sql(sql)
.beanRowMapper(Person.class)
.useSharedExtendedConnection(true)
.build();
}
@Bean
public ItemProcessor<Person, Person> itemProcessor() {
return person -> new Person(person.getId(), person.getName().toUpperCase(), true);
}
@Bean
public JdbcBatchItemWriter<Person> itemWriter(DataSource dataSource) {
String sql = "update person set name = :name, processed = :processed where id = :id";
return new JdbcBatchItemWriterBuilder<Person>()
.dataSource(dataSource)
.sql(sql)
.beanMapped()
.build();
}
@Bean
public Job job(JobRepository jobRepository, JdbcTransactionManager transactionManager,
JdbcCursorItemReader<Person> itemReader, ItemProcessor<Person, Person> itemProcessor, JdbcBatchItemWriter<Person> itemWriter) {
return new JobBuilder("job", jobRepository)
.start(new StepBuilder("step", jobRepository)
.<Person, Person>chunk(2, transactionManager)
.reader(itemReader)
.processor(itemProcessor)
.writer(itemWriter)
.build())
.build();
}
}
public static final class Person {
private int id;
private String name;
private boolean processed;
public Person() {
}
public Person(int id, String name, boolean processed) {
this.id = id;
this.name = name;
this.processed = processed;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public boolean isProcessed() {
return processed;
}
public void setProcessed(boolean processed) {
this.processed = processed;
}
@Override
public boolean equals(Object obj) {
if (obj == this) return true;
if (obj == null || obj.getClass() != this.getClass()) return false;
var that = (Person) obj;
return this.id == that.id &&
Objects.equals(this.name, that.name) &&
this.processed == that.processed;
}
@Override
public int hashCode() {
return Objects.hash(id, name, processed);
}
@Override
public String toString() {
return "Person[" +
"id=" + id + ", " +
"name=" + name + ", " +
"processed=" + processed + ']';
}
}
}
请注意,当未设置
useSharedExtendedConnection
并且数据源未包装在 ExtendedConnectionDataSourceProxy
中时,示例如何按照您的描述挂起。