我正在尝试在处理文件并使用
SftpRemoteFileTemplate
将其放置在 kafka 主题上后删除该文件。我让它使用 SftpOutboundGateway
工作,但根据 EIS 模式,我们应该使用模板。我目前收到一个异常,表示已达到流程末尾。
例外:
Caused by: org.springframework.beans.factory.BeanCreationException: The 'currentComponent' (com.test.enterprisemarketingchannelactivation.engage.config.AcousticToKafkaIntegrationFlow$$Lambda$628/0x000000080104ab28@748f93bb) is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. This is the end of the integration flow.
at org.springframework.integration.dsl.BaseIntegrationFlowDefinition.registerOutputChannelIfCan(BaseIntegrationFlowDefinition.java:3109) ~[spring-integration-core-6.1.2.jar:6.1.2]
at org.springframework.integration.dsl.BaseIntegrationFlowDefinition.channel(BaseIntegrationFlowDefinition.java:275) ~[spring-integration-core-6.1.2.jar:6.1.2]
at org.springframework.integration.dsl.BaseIntegrationFlowDefinition.channel(BaseIntegrationFlowDefinition.java:243) ~[spring-integration-core-6.1.2.jar:6.1.2]
at org.springframework.integration.dsl.BaseIntegrationFlowDefinition.nullChannel(BaseIntegrationFlowDefinition.java:2919) ~[spring-integration-core-6.1.2.jar:6.1.2]
at com.test.enterprisemarketingchannelactivation.engage.config.AcousticToKafkaIntegrationFlow.intermediateDeleteFlow(AcousticToKafkaIntegrationFlow.java:95) ~[classes/:na]
at com.test.enterprisemarketingchannelactivation.engage.config.AcousticToKafkaIntegrationFlow$$SpringCGLIB$$0.CGLIB$intermediateDeleteFlow$5(<generated>) ~[classes/:na]
at com.test.enterprisemarketingchannelactivation.engage.config.AcousticToKafkaIntegrationFlow$$SpringCGLIB$$2.invoke(<generated>) ~[classes/:na]
at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:258) ~[spring-core-6.0.11.jar:6.0.11]
at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:331) ~[spring-context-6.0.11.jar:6.0.11]
at com.test.enterprisemarketingchannelactivation.engage.config.AcousticToKafkaIntegrationFlow$$SpringCGLIB$$0.intermediateDeleteFlow(<generated>) ~[classes/:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:139) ~[spring-beans-6.0.11.jar:6.0.11]
... 18 common frames omitted
代码尝试从初始消息中提取标头,以创建远程服务器上文件的路径,以便
SftpRemoteFileTemplate
可以调用其上的 remove()
方法。
路由通道流程:
.split(Files.splitter()
.markers(true)
.charset(StandardCharsets.UTF_8)
.firstLineAsHeader("myHeaders")
.applySequence(true))
.filter("!payload.contains('\"mark\":\"START\"')", p -> p.discardFlow(df -> df.channel("logStartOfFile")))
.log(LoggingHandler.Level.DEBUG, Constants.LOG_FLOW_CATEGORY, "payload")
.route("payload.contains('\"mark\":\"END\"')", r -> r.channelMapping("true", "deleteFileChannel")
.channelMapping("false", "publisherChannel"))
.get();
}
删除文件流:
@Bean
public IntegrationFlow deleteFileFlow(SftpRemoteFileTemplate remoteFileTemplate, MessageChannel deleteFileChannel) {
return IntegrationFlow.from(deleteFileChannel)
.transform((GenericTransformer<Message, String>) message -> {
var headers = message.getHeaders();
return headers.get(FileHeaders.REMOTE_DIRECTORY) + File.separator + headers.get(FileHeaders.REMOTE_FILE);
})
.handle(path -> remoteFileTemplate.remove(String.valueOf(path.getPayload())))
.nullChannel();
}
异常在
.nullChannel()
方法上抛出。
任何见解都会有所帮助,谢谢。
查看
handle(MessageHandler)
合同。就这么简单:
@FunctionalInterface
public interface MessageHandler {
void handleMessage(Message<?> message) throws MessagingException;
}
注意
void
返回类型。因此,这意味着该处理程序不会生成回复消息。因此无法在此端点上配置输出通道。 nullChannel()
仍然是一个通道,根据消息传递配置,该通道被设置为该端点的输出通道,但由于其处理程序是 void
,因此该配置将被拒绝并显示该错误消息。
因此,您只需在流程末尾删除
nullChannel()
即可。使用 get()
完成流程配置 - 并且您可以将该流程作为单向。