我使用的是 IntegrationFlow
作为 Sftp入站DSL配置 在我使用 CustomTriggerAdvice
来处理手动触发。请看下面的代码片段作为参考。
我也使用了 RotatingServerAdvice
用于处理同一主机中的多个路径。
但当我启动 Sftp入站 它第一次从每个路径中获取文件,但第二次及以后就不工作了。Sftp Inbound Starts但不能从路径中获取文件。 我不知道问题出在哪里。有什么地方是我遗漏的吗?
SftpConfiguration
public IntegrationFlow fileFlow() {
SftpInboundChannelAdapterSpec spec = Sftp
.inboundAdapter(dSF())
.preserveTimestamp(true)
.remoteDirectory(".")
.autoCreateLocalDirectory(true)
.deleteRemoteFiles(false)
.localDirectory(new File(getDestinationLocation()));
return IntegrationFlows
.from(spec, e -> e.id(BEAN_ID)
.autoStartup(false)
.poller(Pollers
.fixedDelay(5000)
.advice(
customRotatingServerAdvice(dSF()),
customTriggerAdvice()
)
)
)
.channel(sftpReceiverChannel())
.handle(sftpInboundMessageHandler())
.get();
}
private MessageChannel sftpReceiverChannel() {
return MessageChannels.direct().get();
}
... ... ...
@Bean
public RotatingServerAdvice customRotatingServerAdvice(
DelegatingSessionFactory<LsEntry> dSF
) {
List<String> pathList = getSourcePathList();
for (String path : pathList) {
keyDirectories.add(new RotationPolicy.KeyDirectory(KEY, path));
}
return new RotatingServerAdvice(
dSF,
keyDirectories
);
}
@Bean
public CustomTriggerAdvice customTriggerAdvice() {
return new CustomTriggerAdvice(customControlChannel(),BEAN_ID);
}
@Bean
public IntegrationFlow customControlBus() {
return IntegrationFlows.from(customControlChannel())
.controlBus()
.get();
}
@Bean
public MessageChannel customControlChannel() {
return MessageChannels.direct().get();
}
CustomTriggerAdvice
public class CustomTriggerAdvice extends AbstractMessageSourceAdvice {
private final MessageChannel controlChannel;
private final String BEAN_ID;
public CustomTriggerAdvice(MessageChannel controlChannel, String beanID) {
this.controlChannel = controlChannel;
this.BEAN_ID = beanID;
}
@Override
public boolean beforeReceive(MessageSource<?> source) {
return true;
}
@Override
public Message<?> afterReceive(Message<?> result, MessageSource<?> source) {
if (result == null) {
controlChannel.send(new GenericMessage<>("@" + BEAN_ID + ".stop()"));
}
return result;
}
}
使用MessageChannel启动Sftp Inbound。
@Qualifier("customControlChannel") MessageChannel controlChannel;
public void startSftpInbound(){
controlChannel.send(new GenericMessage<>("@" + beanID + ".start()"));
}
我需要系统按需启动,并在一个周期内完成文件的获取。如果之后没有停止,它将继续轮询,不会停止,我的系统将陷入无限循环。有什么办法可以让当RotatingServerAdvice至少从所有服务器完成一个轮询?它是否抛出任何事件或类似的东西?
你可能误解了这个逻辑。afterReceive(@Nullable Message<?> result, MessageSource<?> source)
合同。当其中一台服务器没有返回任何轮询信息时,你不能为了你的要求而停止一个通道适配器。这样你就不会给另一个服务器在下一个轮询周期中轮询的机会。
我认为你的想法是只对所有服务器进行一次迭代,然后停止。可能是独立于任何一个服务器的结果。看起来对你来说,最好的停止方式是使用一个 RotatingServerAdvice
随着 fair = true
每次都要移动到下一个服务器。停止可以从自定义的 afterReceive()
独立于结果,当你看到 RotationPolicy.getCurrent()
等于列表中的最后一个。所以,这样你就会迭代所有的人,并在下一个polaring循环中停止移动第一个人。