使用SftpRemoteFileTemplate从远程服务器删除文件

问题描述 投票:0回答:1

我正在尝试在处理文件并使用

SftpRemoteFileTemplate
将其放置在 kafka 主题上后删除该文件。我让它使用
SftpOutboundGateway
工作,但根据 EIS 模式,我们应该使用模板。我目前收到一个异常,表示已达到流程末尾。

例外:

Caused by: org.springframework.beans.factory.BeanCreationException: The 'currentComponent' (com.test.enterprisemarketingchannelactivation.engage.config.AcousticToKafkaIntegrationFlow$$Lambda$628/0x000000080104ab28@748f93bb) is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. This is the end of the integration flow.
    at org.springframework.integration.dsl.BaseIntegrationFlowDefinition.registerOutputChannelIfCan(BaseIntegrationFlowDefinition.java:3109) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at org.springframework.integration.dsl.BaseIntegrationFlowDefinition.channel(BaseIntegrationFlowDefinition.java:275) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at org.springframework.integration.dsl.BaseIntegrationFlowDefinition.channel(BaseIntegrationFlowDefinition.java:243) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at org.springframework.integration.dsl.BaseIntegrationFlowDefinition.nullChannel(BaseIntegrationFlowDefinition.java:2919) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at com.test.enterprisemarketingchannelactivation.engage.config.AcousticToKafkaIntegrationFlow.intermediateDeleteFlow(AcousticToKafkaIntegrationFlow.java:95) ~[classes/:na]
    at com.test.enterprisemarketingchannelactivation.engage.config.AcousticToKafkaIntegrationFlow$$SpringCGLIB$$0.CGLIB$intermediateDeleteFlow$5(<generated>) ~[classes/:na]
    at com.test.enterprisemarketingchannelactivation.engage.config.AcousticToKafkaIntegrationFlow$$SpringCGLIB$$2.invoke(<generated>) ~[classes/:na]
    at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:258) ~[spring-core-6.0.11.jar:6.0.11]
    at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:331) ~[spring-context-6.0.11.jar:6.0.11]
    at com.test.enterprisemarketingchannelactivation.engage.config.AcousticToKafkaIntegrationFlow$$SpringCGLIB$$0.intermediateDeleteFlow(<generated>) ~[classes/:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
    at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:139) ~[spring-beans-6.0.11.jar:6.0.11]
    ... 18 common frames omitted

代码尝试从初始消息中提取标头,以创建远程服务器上文件的路径,以便

SftpRemoteFileTemplate
可以调用其上的
remove()
方法。

路由通道流程:

                              .split(Files.splitter()
                                          .markers(true)
                                          .charset(StandardCharsets.UTF_8)
                                          .firstLineAsHeader("myHeaders")
                                          .applySequence(true))
                              .filter("!payload.contains('\"mark\":\"START\"')", p -> p.discardFlow(df -> df.channel("logStartOfFile")))
                              .log(LoggingHandler.Level.DEBUG, Constants.LOG_FLOW_CATEGORY, "payload")
                              .route("payload.contains('\"mark\":\"END\"')", r -> r.channelMapping("true", "deleteFileChannel")
                                                                                   .channelMapping("false", "publisherChannel"))
                              .get();
    }

删除文件流:

    @Bean
    public IntegrationFlow deleteFileFlow(SftpRemoteFileTemplate remoteFileTemplate, MessageChannel deleteFileChannel) {
        return IntegrationFlow.from(deleteFileChannel)
                              .transform((GenericTransformer<Message, String>) message -> {
                                  var headers = message.getHeaders();
                                  return headers.get(FileHeaders.REMOTE_DIRECTORY) + File.separator + headers.get(FileHeaders.REMOTE_FILE);
                              })
                .handle(path -> remoteFileTemplate.remove(String.valueOf(path.getPayload())))
                              .nullChannel();
    }

异常在

.nullChannel()
方法上抛出。

任何见解都会有所帮助,谢谢。

java spring-integration spring-dsl
1个回答
0
投票

查看

handle(MessageHandler)
合同。就这么简单:

@FunctionalInterface
public interface MessageHandler {

    void handleMessage(Message<?> message) throws MessagingException;

}

注意

void
返回类型。因此,这意味着该处理程序不会生成回复消息。因此无法在此端点上配置输出通道。
nullChannel()
仍然是一个通道,根据消息传递配置,该通道被设置为该端点的输出通道,但由于其处理程序是
void
,因此该配置将被拒绝并显示该错误消息。

因此,您只需在流程末尾删除

nullChannel()
即可。使用
get()
完成流程配置 - 并且您可以将该流程作为单向。

© www.soinside.com 2019 - 2024. All rights reserved.