使用 Spring 集成,我通过
QueueChannel
从用 Spring 的 @ScheduledService
注释的方法发送消息到用 @ServiceActivator
注释的方法。我期望 @ServiceActivator
带注释的方法默认在单独的线程中执行,但事实似乎并非如此。
@Configuration
public class SpecificIntegrationConfig {
@Bean
public MessageChannel dirListingChannel() {
return new QueueChannel(5);
}
}
我使用
MessageGateway
发送消息:
@MessagingGateway
public interface MessageGateway {
@Gateway(requestChannel = "dirListingChannel")
void sendListing(List<DirEntry> entries);
}
发送消息(远程目录中的文件列表):
@RequiredArgsConstructor
public class SftpPollerComponent {
private final SftpClient sftpClient;
private final MessageGateway messageGateway;
@Value("${remote.dir}")
private final String dir;
@Scheduled(fixedDelay = 2_000)
public void pollRemote() throws IOException {
final ArrayList<SftpClient.DirEntry> dirEntries = new ArrayList<>(20);
for (final SftpClient.DirEntry entry : this.sftpClient.readDir(this.dir)) {
dirEntries.add(entry);
}
log.info("Sender thread: {}", Thread.currentThread().getName());
this.messageGateway.sendListing(dirEntries);
}
}
以及消息的接收:
@Component
public class DirEntryService {
@ServiceActivator(inputChannel = "dirListingChannel")
public void handleFiles(final List<DirEntry> files) throws InterruptedException {
log.info("Receiving thread name: {}", Thread.currentThread().getName());
log.info("Sleeping..");
Thread.sleep(6000);
log.info("Service awake.");
}
}
我在日志记录中看到的是,接收方法与发送者/
@Scheduled
方法在同一线程上执行,并且接收者中强制执行的Thread.sleep(6000)
会阻止发送者。也就是说,整个过程看起来好像是同步的。
2024-11-28T17:45:31.503Z INFO 12987 --- [ scheduling-1] c.e.i.i.component.SftpPollerComponent : Sender thread: scheduling-1
2024-11-28T17:45:32.478Z INFO 12987 --- [ scheduling-1] c.e.i.ingest.service.DirEntryService : Receiving thread name: scheduling-1
2024-11-28T17:45:32.478Z INFO 12987 --- [ scheduling-1] c.e.i.ingest.service.DirEntryService : There are 10 files
2024-11-28T17:45:32.479Z INFO 12987 --- [ scheduling-1] c.e.i.ingest.service.DirEntryService : Sleeping..
2024-11-28T17:45:38.479Z INFO 12987 --- [ scheduling-1] c.e.i.ingest.service.DirEntryService : Service awake.
2024-11-28T17:45:39.496Z INFO 12987 --- [ scheduling-1] c.e.i.i.component.SftpPollerComponent : Sender thread: scheduling-1
2024-11-28T17:45:40.480Z INFO 12987 --- [ scheduling-1] c.e.i.ingest.service.DirEntryService : Receiving thread name: scheduling-1
2024-11-28T17:45:40.481Z INFO 12987 --- [ scheduling-1] c.e.i.ingest.service.DirEntryService : There are 10 files
2024-11-28T17:45:40.481Z INFO 12987 --- [ scheduling-1] c.e.i.ingest.service.DirEntryService : Sleeping..
2024-11-28T17:45:46.481Z INFO 12987 --- [ scheduling-1] c.e.i.ingest.service.DirEntryService : Service awake.
2024-11-28T17:45:47.497Z INFO 12987 --- [ scheduling-1] c.e.i.i.component.SftpPollerComponent : Sender thread: scheduling-1
2024-11-28T17:45:48.482Z INFO 12987 --- [ scheduling-1] c.e.i.ingest.service.DirEntryService : Receiving thread name: scheduling-1
2024-11-28T17:45:48.482Z INFO 12987 --- [ scheduling-1] c.e.i.ingest.service.DirEntryService : There are 10 files
2024-11-28T17:45:48.482Z INFO 12987 --- [ scheduling-1] c.e.i.ingest.service.DirEntryService : Sleeping..
我假设,通过默认指定
QueueChannel
,Spring 将自动安排队列轮询并使用另一个线程。也根据Spring自己的docs:
如果您希望轮询是异步的,轮询器可以选择指定指向任何 TaskExecutor bean 的现有实例的任务执行器属性
这似乎表明我观察到的行为是默认行为,在我看来,如果默认情况下接收将阻止发送者,则这似乎与具有可轮询队列的想法相反。
@Scheduled
和 AbstractPollingEndpoint
使用 Spring Boot 自动配置中相同的共享 ThreadPoolTaskScheduler
。默认情况下,该产品带有 1
螺纹。这就是为什么你总是只看到那个 scheduling-1
的名字。另外,如果您阻止该线程,则计划的所有内容也将被阻止。在文档中查看更多信息:
https://docs.spring.io/spring-boot/reference/features/task-execution-and-scheduling.html
https://docs.spring.io/spring-boot/reference/messaging/spring-integration.html
我相信它已经被默认设置为
1
,因为Spring Boot的目的是开发微服务。因此,我们从一开始就使用最少的资源,您可以根据需要自由调整。