PollableChannel 默认情况下是同步的

问题描述 投票:0回答:1

使用 Spring 集成,我通过

QueueChannel
从用 Spring 的
@ScheduledService
注释的方法发送消息到用
@ServiceActivator
注释的方法。我期望
@ServiceActivator
带注释的方法默认在单独的线程中执行,但事实似乎并非如此。

@Configuration
public class SpecificIntegrationConfig {

    @Bean
    public MessageChannel dirListingChannel() {
        return new QueueChannel(5);
    }
}

我使用

MessageGateway
发送消息:

@MessagingGateway
public interface MessageGateway {

    @Gateway(requestChannel = "dirListingChannel")
    void sendListing(List<DirEntry> entries);
}

发送消息(远程目录中的文件列表):

@RequiredArgsConstructor
public class SftpPollerComponent {
    private final SftpClient sftpClient;
    private final MessageGateway messageGateway;
    @Value("${remote.dir}")
    private final String dir;

    @Scheduled(fixedDelay = 2_000)
    public void pollRemote() throws IOException {
        final ArrayList<SftpClient.DirEntry> dirEntries = new ArrayList<>(20);
        for (final SftpClient.DirEntry entry : this.sftpClient.readDir(this.dir)) {
            dirEntries.add(entry);
        }
        log.info("Sender thread: {}", Thread.currentThread().getName());
        this.messageGateway.sendListing(dirEntries);
    }

}

以及消息的接收:

@Component
public class DirEntryService {

    @ServiceActivator(inputChannel = "dirListingChannel")
    public void handleFiles(final List<DirEntry> files) throws InterruptedException {
        log.info("Receiving thread name: {}", Thread.currentThread().getName());
        log.info("Sleeping..");
        Thread.sleep(6000);
        log.info("Service awake.");
    }
}

我在日志记录中看到的是,接收方法与发送者/

@Scheduled
方法在同一线程上执行,并且接收者中强制执行的
Thread.sleep(6000)
会阻止发送者。也就是说,整个过程看起来好像是同步的。

2024-11-28T17:45:31.503Z  INFO 12987 --- [   scheduling-1] c.e.i.i.component.SftpPollerComponent    : Sender thread: scheduling-1
2024-11-28T17:45:32.478Z  INFO 12987 --- [   scheduling-1] c.e.i.ingest.service.DirEntryService     : Receiving thread name: scheduling-1
2024-11-28T17:45:32.478Z  INFO 12987 --- [   scheduling-1] c.e.i.ingest.service.DirEntryService     : There are 10 files
2024-11-28T17:45:32.479Z  INFO 12987 --- [   scheduling-1] c.e.i.ingest.service.DirEntryService     : Sleeping..
2024-11-28T17:45:38.479Z  INFO 12987 --- [   scheduling-1] c.e.i.ingest.service.DirEntryService     : Service awake.
2024-11-28T17:45:39.496Z  INFO 12987 --- [   scheduling-1] c.e.i.i.component.SftpPollerComponent    : Sender thread: scheduling-1
2024-11-28T17:45:40.480Z  INFO 12987 --- [   scheduling-1] c.e.i.ingest.service.DirEntryService     : Receiving thread name: scheduling-1
2024-11-28T17:45:40.481Z  INFO 12987 --- [   scheduling-1] c.e.i.ingest.service.DirEntryService     : There are 10 files
2024-11-28T17:45:40.481Z  INFO 12987 --- [   scheduling-1] c.e.i.ingest.service.DirEntryService     : Sleeping..
2024-11-28T17:45:46.481Z  INFO 12987 --- [   scheduling-1] c.e.i.ingest.service.DirEntryService     : Service awake.
2024-11-28T17:45:47.497Z  INFO 12987 --- [   scheduling-1] c.e.i.i.component.SftpPollerComponent    : Sender thread: scheduling-1
2024-11-28T17:45:48.482Z  INFO 12987 --- [   scheduling-1] c.e.i.ingest.service.DirEntryService     : Receiving thread name: scheduling-1
2024-11-28T17:45:48.482Z  INFO 12987 --- [   scheduling-1] c.e.i.ingest.service.DirEntryService     : There are 10 files
2024-11-28T17:45:48.482Z  INFO 12987 --- [   scheduling-1] c.e.i.ingest.service.DirEntryService     : Sleeping..

假设,通过默认指定

QueueChannel
,Spring 将自动安排队列轮询并使用另一个线程。也根据Spring自己的docs

如果您希望轮询是异步的,轮询器可以选择指定指向任何 TaskExecutor bean 的现有实例的任务执行器属性

这似乎表明我观察到的行为是默认行为,在我看来,如果默认情况下接收将阻止发送者,则这似乎与具有可轮询队列的想法相反。

spring spring-integration
1个回答
0
投票

@Scheduled
AbstractPollingEndpoint
使用 Spring Boot 自动配置中相同的共享
ThreadPoolTaskScheduler
。默认情况下,该产品带有
1
螺纹。这就是为什么你总是只看到那个
scheduling-1
的名字。另外,如果您阻止该线程,则计划的所有内容也将被阻止。在文档中查看更多信息:

https://docs.spring.io/spring-boot/reference/features/task-execution-and-scheduling.html

https://docs.spring.io/spring-boot/reference/messaging/spring-integration.html

我相信它已经被默认设置为

1
,因为Spring Boot的目的是开发微服务。因此,我们从一开始就使用最少的资源,您可以根据需要自由调整。

© www.soinside.com 2019 - 2024. All rights reserved.