Spring 集成:并行处理来自 Sftp.inboundStreamingAdapter 的消息

问题描述 投票:0回答:1

那么,

如果有一个 Spring Integration 5.5 / Spring Boot 2.7 流程,可以从 SFTP 源读取 XML 文件,处理并存储它们,然后删除。事情确实有效,但是是按顺序进行的,这意味着每个文件都会在另一个文件之后被处理。

我想异步进行处理和转换(不需要并行,顺序不重要)以增加吞吐量。但我不知道如何配置它。

代码:

IntegrationFlows
  .from(Sftp.inboundStreamingAdapter(sftpTemplate))
  .publishSubscribeChannel(spec -> spec
    .subscribe(
      flow -> 
        flow
         .transform( /* file content to xml */ )
         .handle( /* persist as xml */ )
     )
    .subscribe(flow -> flow.handle( /* remove file */ ))
   )
  .get()

我猜异步/并行处理必须在源和发布/子通道之间定义。但如何呢?

java spring spring-boot spring-integration spring-integration-sftp
1个回答
0
投票

您的

publishSubscribeChannel()
配置是正确的,不要尝试使其并行,因为您无法删除(甚至关闭)远程文件,因为您在
InputStream
之后处理有效负载中的
Sftp.inboundStreamingAdapter()
。为了使其从源角度平行,您需要研究
IntegrationFlows.from()
:

的第二个参数
static IntegrationFlowBuilder from(MessageSource<?> messageSource,
        @Nullable Consumer<SourcePollingChannelAdapterSpec> endpointConfigurer) {

Consumer<SourcePollingChannelAdapterSpec>
使您可以访问源轮询通道适配器选项。其中之一是
poller(Function<PollerFactory, PollerSpec> pollers)
,可以进行如下配置:

e -> e.poller(p -> p.fixedDelay(1000).taskExecutor())

taskExecutor
允许将计划任务转移到不同的线程。请小心
maxMessagesPerPoll
1
默认为
SourcePollingChannelAdapter
):如果超过
1
,那么该数量的消息将在同一个线程中发出。

您也可以在此

channel(c -> c.executor())
之后简单地添加
from()

© www.soinside.com 2019 - 2024. All rights reserved.