那么,
如果有一个 Spring Integration 5.5 / Spring Boot 2.7 流程,可以从 SFTP 源读取 XML 文件,处理并存储它们,然后删除。事情确实有效,但是是按顺序进行的,这意味着每个文件都会在另一个文件之后被处理。
我想异步进行处理和转换(不需要并行,顺序不重要)以增加吞吐量。但我不知道如何配置它。
代码:
IntegrationFlows
.from(Sftp.inboundStreamingAdapter(sftpTemplate))
.publishSubscribeChannel(spec -> spec
.subscribe(
flow ->
flow
.transform( /* file content to xml */ )
.handle( /* persist as xml */ )
)
.subscribe(flow -> flow.handle( /* remove file */ ))
)
.get()
我猜异步/并行处理必须在源和发布/子通道之间定义。但如何呢?
您的
publishSubscribeChannel()
配置是正确的,不要尝试使其并行,因为您无法删除(甚至关闭)远程文件,因为您在 InputStream
之后处理有效负载中的 Sftp.inboundStreamingAdapter()
。为了使其从源角度平行,您需要研究 IntegrationFlows.from()
: 的第二个参数
static IntegrationFlowBuilder from(MessageSource<?> messageSource,
@Nullable Consumer<SourcePollingChannelAdapterSpec> endpointConfigurer) {
Consumer<SourcePollingChannelAdapterSpec>
使您可以访问源轮询通道适配器选项。其中之一是 poller(Function<PollerFactory, PollerSpec> pollers)
,可以进行如下配置:
e -> e.poller(p -> p.fixedDelay(1000).taskExecutor())
taskExecutor
允许将计划任务转移到不同的线程。请小心 maxMessagesPerPoll
(1
默认为 SourcePollingChannelAdapter
):如果超过 1
,那么该数量的消息将在同一个线程中发出。
您也可以在此
channel(c -> c.executor())
之后简单地添加 from()
。