如何解决`RotatingServerAdvice`多次取不到文件的问题?

问题描述 投票:1回答:1

我使用的是 IntegrationFlow 作为 Sftp入站DSL配置 在我使用 CustomTriggerAdvice 来处理手动触发。请看下面的代码片段作为参考。

我也使用了 RotatingServerAdvice 用于处理同一主机中的多个路径。

但当我启动 Sftp入站 它第一次从每个路径中获取文件,但第二次及以后就不工作了。Sftp Inbound Starts但不能从路径中获取文件。 我不知道问题出在哪里。有什么地方是我遗漏的吗?

SftpConfiguration

public IntegrationFlow fileFlow() {
    SftpInboundChannelAdapterSpec spec = Sftp
            .inboundAdapter(dSF())
            .preserveTimestamp(true)
            .remoteDirectory(".")
            .autoCreateLocalDirectory(true)
            .deleteRemoteFiles(false)
            .localDirectory(new File(getDestinationLocation()));

    return IntegrationFlows
            .from(spec, e -> e.id(BEAN_ID)
                    .autoStartup(false)
                    .poller(Pollers
                            .fixedDelay(5000)
                            .advice(
                                    customRotatingServerAdvice(dSF()),
                                    customTriggerAdvice()
                            )
                    )
            )
            .channel(sftpReceiverChannel())
            .handle(sftpInboundMessageHandler())
            .get();
}

private MessageChannel sftpReceiverChannel() {
    return MessageChannels.direct().get();
}

... ... ...

@Bean
public RotatingServerAdvice customRotatingServerAdvice(
      DelegatingSessionFactory<LsEntry> dSF
) {
    List<String> pathList = getSourcePathList();
    for (String path : pathList) {
        keyDirectories.add(new RotationPolicy.KeyDirectory(KEY, path));
    }

    return new RotatingServerAdvice(
            dSF,
            keyDirectories
    );
}

@Bean
public CustomTriggerAdvice customTriggerAdvice() {
    return new CustomTriggerAdvice(customControlChannel(),BEAN_ID);
}


@Bean
public IntegrationFlow customControlBus() {
    return IntegrationFlows.from(customControlChannel())
            .controlBus()
            .get();
}


@Bean
public MessageChannel customControlChannel() {
    return MessageChannels.direct().get();
}

CustomTriggerAdvice

public class CustomTriggerAdvice extends AbstractMessageSourceAdvice {
    private final MessageChannel controlChannel;
    private final String BEAN_ID;

    public CustomTriggerAdvice(MessageChannel controlChannel, String beanID) {
        this.controlChannel = controlChannel;
        this.BEAN_ID = beanID;
    }

    @Override
    public boolean beforeReceive(MessageSource<?> source) {
        return true;
    }

    @Override
    public Message<?> afterReceive(Message<?> result, MessageSource<?> source) {
        if (result == null) {
            controlChannel.send(new GenericMessage<>("@" + BEAN_ID + ".stop()"));
        } 
        return result;
    }
}

使用MessageChannel启动Sftp Inbound。

@Qualifier("customControlChannel") MessageChannel controlChannel;


public void startSftpInbound(){
       controlChannel.send(new GenericMessage<>("@" + beanID + ".start()"));
}
java spring spring-integration dsl spring-integration-sftp
1个回答
1
投票

我需要系统按需启动,并在一个周期内完成文件的获取。如果之后没有停止,它将继续轮询,不会停止,我的系统将陷入无限循环。有什么办法可以让当RotatingServerAdvice至少从所有服务器完成一个轮询?它是否抛出任何事件或类似的东西?

你可能误解了这个逻辑。afterReceive(@Nullable Message<?> result, MessageSource<?> source) 合同。当其中一台服务器没有返回任何轮询信息时,你不能为了你的要求而停止一个通道适配器。这样你就不会给另一个服务器在下一个轮询周期中轮询的机会。

我认为你的想法是只对所有服务器进行一次迭代,然后停止。可能是独立于任何一个服务器的结果。看起来对你来说,最好的停止方式是使用一个 RotatingServerAdvice 随着 fair = true 每次都要移动到下一个服务器。停止可以从自定义的 afterReceive() 独立于结果,当你看到 RotationPolicy.getCurrent() 等于列表中的最后一个。所以,这样你就会迭代所有的人,并在下一个polaring循环中停止移动第一个人。

© www.soinside.com 2019 - 2024. All rights reserved.