当 ItemReader 锁定行时,如何使 ItemWriter 能够将项目写入数据库?

问题描述 投票:0回答:1

我定义了一个简单的作业,应该从数据库读取记录,处理项目并更新/写入已成功处理的项目的状态列。

写入器被卡住(当尝试写入块时提交事务时),因为 itemReader 已经锁定了表。 ItemReader 输入由此查询定义:

SELECT ... FOR UPDATE SKIP LOCKED LIMIT 500

当 ItemReader 锁定行时,如何使 ItemWriter 能够将项目写入数据库?

我尝试将自定义

PlatformTransactionManager
bean 定义为

@Bean
public PlatformTransactionManager transactionManager(final DataSource dataSource) {
    return new JdbcTransactionManager(dataSource);
}
spring-batch
1个回答
0
投票

这个和

useSharedExtendedConnection
JdbcCursorItemReader
参数有关。默认情况下,游标将在其连接中使用打开,并且不会参与为其余步骤处理启动的任何事务。因此,您面临的问题。

您需要将该参数设置为

true
并将数据源包装在 Spring Batch 提供的
ExtendedConnectionDataSourceProxy
中(如 Javadoc 中所述)。

这里是一个完整的示例(基于 Spring Batch 5.0.3 和 TestContainers,您可以添加here进行快速测试):

/*
 * Copyright 2020-2023 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.springframework.batch.core.test.repository;

import java.util.Objects;

import javax.sql.DataSource;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.postgresql.ds.PGSimpleDataSource;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.ExtendedConnectionDataSourceProxy;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.jdbc.core.DataClassRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator;
import org.springframework.jdbc.support.JdbcTransactionManager;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

import static org.junit.jupiter.api.Assertions.assertEquals;

/**
 * @author Mahmoud Ben Hassine
 */
@Testcontainers(disabledWithoutDocker = true)
@SpringJUnitConfig
class PostgreSQLJobRepositoryIntegrationTests {

    private static final DockerImageName POSTGRESQL_IMAGE = DockerImageName.parse("postgres:13.3");

    @Container
    public static PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>(POSTGRESQL_IMAGE);

    @Autowired
    private DataSource dataSource;

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job job;

    @BeforeEach
    void setUp() {
        ResourceDatabasePopulator databasePopulator = new ResourceDatabasePopulator();
        databasePopulator.addScript(new ClassPathResource("/org/springframework/batch/core/schema-postgresql.sql"));
        databasePopulator.execute(this.dataSource);
        jdbcTemplate.execute("create table person (id bigint not null primary key, name varchar not null, processed boolean)");
        jdbcTemplate.execute("insert into person (id, name, processed) values (1, 'foo1', false)");
        jdbcTemplate.execute("insert into person (id, name, processed) values (2, 'FOO2', true)");
        jdbcTemplate.execute("insert into person (id, name, processed) values (3, 'bar1', false)");
        jdbcTemplate.execute("insert into person (id, name, processed) values (4, 'BAR2', true)");
    }

    @Test
    void testJobExecution() throws Exception {
        // given
        JobParameters jobParameters = new JobParametersBuilder().toJobParameters();

        // when
        JobExecution jobExecution = this.jobLauncher.run(this.job, jobParameters);

        // then
        assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus());
        jdbcTemplate
                .query("select * from person", new DataClassRowMapper<>(Person.class))
                .forEach(person -> System.out.println("person = " + person));
    }

    @Configuration
    @EnableBatchProcessing
    static class TestConfiguration {

        @Bean
        public DataSource dataSource() throws Exception {
            PGSimpleDataSource datasource = new PGSimpleDataSource();
            datasource.setURL(postgres.getJdbcUrl());
            datasource.setUser(postgres.getUsername());
            datasource.setPassword(postgres.getPassword());
            return new ExtendedConnectionDataSourceProxy(datasource);
        }

        @Bean
        public JdbcTransactionManager transactionManager(DataSource dataSource) {
            return new JdbcTransactionManager(dataSource);
        }

        @Bean
        public JdbcTemplate jdbcTemplate(DataSource dataSource) {
            return new JdbcTemplate(dataSource);
        }

        @Bean
        public JdbcCursorItemReader<Person> itemReader(DataSource dataSource) {
            String sql = "select * from person where processed = false for update skip locked limit 2";
            return new JdbcCursorItemReaderBuilder<Person>()
                    .name("personItemReader")
                    .dataSource(dataSource)
                    .sql(sql)
                    .beanRowMapper(Person.class)
                    .useSharedExtendedConnection(true)
                    .build();
        }

        @Bean
        public ItemProcessor<Person, Person> itemProcessor() {
            return person -> new Person(person.getId(), person.getName().toUpperCase(), true);
        }

        @Bean
        public JdbcBatchItemWriter<Person> itemWriter(DataSource dataSource) {
            String sql = "update person set name = :name, processed = :processed where id = :id";
            return new JdbcBatchItemWriterBuilder<Person>()
                    .dataSource(dataSource)
                    .sql(sql)
                    .beanMapped()
                    .build();
        }

        @Bean
        public Job job(JobRepository jobRepository, JdbcTransactionManager transactionManager,
                       JdbcCursorItemReader<Person> itemReader, ItemProcessor<Person, Person> itemProcessor, JdbcBatchItemWriter<Person> itemWriter) {
            return new JobBuilder("job", jobRepository)
                .start(new StepBuilder("step", jobRepository)
                    .<Person, Person>chunk(2, transactionManager)
                        .reader(itemReader)
                        .processor(itemProcessor)
                        .writer(itemWriter)
                    .build())
                .build();
        }

    }

    public static final class Person {
        private int id;
        private String name;
        private boolean processed;

        public Person() {
        }

        public Person(int id, String name, boolean processed) {
            this.id = id;
            this.name = name;
            this.processed = processed;
        }

        public int getId() {
            return id;
        }

        public void setId(int id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public boolean isProcessed() {
            return processed;
        }

        public void setProcessed(boolean processed) {
            this.processed = processed;
        }

        @Override
        public boolean equals(Object obj) {
            if (obj == this) return true;
            if (obj == null || obj.getClass() != this.getClass()) return false;
            var that = (Person) obj;
            return this.id == that.id &&
                    Objects.equals(this.name, that.name) &&
                    this.processed == that.processed;
        }

        @Override
        public int hashCode() {
            return Objects.hash(id, name, processed);
        }

        @Override
        public String toString() {
            return "Person[" +
                    "id=" + id + ", " +
                    "name=" + name + ", " +
                    "processed=" + processed + ']';
        }
    }

}

请注意,当未设置

useSharedExtendedConnection
并且数据源未包装在
ExtendedConnectionDataSourceProxy
中时,示例如何按照您的描述挂起。

© www.soinside.com 2019 - 2024. All rights reserved.