Spring Integration File Support中的条件文件移动

问题描述 投票:0回答:1

我正在实现以下一个Spring Integration工作流程。

IntegrationFlows.from("inputFileProcessorChannel")
                           .split(fileSplitterSpec, spec -> {})
                           .transform(lineItemTransformer)
                           .handle(httpRequestExecutingMessageHandler)
                           .transform(reportDataAggregator)
                           .aggregate(aggregatorSpec -> aggregatorSpec.requiresReply(false))
                           .channel("reportGeneratorChannel")
                           .get();

现在,完成上述流程后,我需要将input file移至存档目录。决定目标目录的决定基于消息头processingFailed,并且在流程的.transform(reportDataAggregator)步骤中添加此头。要移动此文件,我创建了另一个流程,如下面的代码所示

 IntegrationFlows.from(MessageChannels.direct("inputFileProcessorChannel"))
                           .routeToRecipients(routerSpec -> {
                               routerSpec.recipient("processedFileMoverChannel", createMessageSelector(Boolean.FALSE))
                                         .recipient("failedFileMoverChannel", createMessageSelector(Boolean.TRUE));
                           })

                           .get();  

选择器方法

 private MessageSelector createMessageSelector(Boolean ruleBoolean) {
    return message -> ruleBoolean.equals(message.getHeaders().get("processingFailed"));
}

下面的报告频道流

IntegrationFlows.from("reportGeneratorChannel")
                           .transform(executionReportTransformer)
                           .handle(reportWritingMessageHandlerSpec)
                           .get();

但是,正如该流程所预期的那样,由于在流执行中不存在所述头,因此文件移动未完成。

因此,如何在创建报告文件后实现执行file mover flow的目标?

spring spring-integration spring-integration-dsl
1个回答
0
投票

FileSplitter为我们填充要产生的每一行的标题:

@Override
protected boolean willAddHeaders(Message<?> message) {
    Object payload = message.getPayload();
    return payload instanceof File || payload instanceof String;
}

@Override
protected void addHeaders(Message<?> message, Map<String, Object> headers) {
    File file = null;
    if (message.getPayload() instanceof File) {
        file = (File) message.getPayload();
    }
    else if (message.getPayload() instanceof String) {
        file = new File((String) message.getPayload());
    }
    if (file != null) {
        if (!headers.containsKey(FileHeaders.ORIGINAL_FILE)) {
            headers.put(FileHeaders.ORIGINAL_FILE, file);
        }
        if (!headers.containsKey(FileHeaders.FILENAME)) {
            headers.put(FileHeaders.FILENAME, file.getName());
        }
    }
}

因此,即使您已完成聚合并准备向该.channel("reportGeneratorChannel")发送消息,您仍然可以访问那些与文件相关的标头。

将此reportGeneratorChannel设置为PublishSubscribeChannel,然后将“文件移动器流”移到那里,将为您解决问题。

顺便说一句:到目前为止,在同一个通道上使用IntegrationFlows.from(MessageChannels.direct("inputFileProcessorChannel"))和第二个流时,您将进行循环调度。那不是发布订阅分发。在文档中查看更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/core.html#channel

© www.soinside.com 2019 - 2024. All rights reserved.