Spring集成使异步网关等待聚合结果

问题描述 投票:0回答:2

我正在使用spring集成来处理文件。我想逐行读取文件,过滤,转换并发送到kafka。通过异步网关传递文件进行处理。在处理结束时,我出于以下两个目的添加了聚合器:

  • 我想等到整个文件处理完毕。也就是说,所有经过正确处理的行都已发送到kafka,所有导致错误的行都由从错误通道读取的流处理。
  • 作为网关的返回值,我希望接收汇总结果以进行文件处理。从文件读取了多少行,处理了多少行而没有错误,等等。

到目前为止,我已经创建了一个聚合器,等待文件标记START和END +所有已处理的行。我已经创建了自定义消息组,该组计算已处理的行,还存储了从文件START标记消息中获取的replyChannel和从文件END标记消息中获取的行数。该组还设置了5秒超时。组完成后,它会发出带有所需统计信息的消息,并带有从START标记获取的值的replyChannel标头。我的理解是网关正在等待对此replyChannel的响应。这是消息组的代码:

public void add(Message<?> messageToAdd) {
        if(messageToAdd.getPayload() instanceof FileMarker) {
            FileMarker marker = (FileMarker) messageToAdd.getPayload();
            switch (marker.getMark()) {
                case START:
                    replyChannel = messageToAdd.getHeaders().get(MessageHeaders.REPLY_CHANNEL);
                    break;
                case END:
                    expectedRowCount.set(marker.getLineCount());
                    break;
                default:
                    throw new IllegalStateException("Unexpected file mark");
            }
        } else {
            ProcessingResult processingResult = messageToAdd.getHeaders()
                    .get(ProcessingConstants.PROCESSING_RESULT_HEADER, ProcessingResult.class);
            assert processingResult != null;
            rowCount.incrementAndGet();
            switch (processingResult) {
                case FILTERED:
                    filteredCount.incrementAndGet();
                    break;
                case PROCESSED:
                    processedCount.incrementAndGet();
                    break;
                case PROCESSING_ERROR:
                    processingErrorCount.incrementAndGet();
                    break;
                case KAFKA_ERROR:
                    kafkaErrorCount.incrementAndGet();
                    break;
                default:
                    throw new IllegalStateException("Unrecognized processing result: " + processingResult);
            }
        }
    }

我的问题是我的网关永远不会收到响应,并且会无限期地等待。 由于网关处理,如何使异步网关等待聚合器发出其消息并接收此消息的有效载荷?

可以在此处找到重新创建问题的测试,https://github.com/hawk1234/spring-integration-example提交9f121f0729d8076872e6fbdcd7b1b91ca9ea8cb4

。运行测试时,应用程序日志位于路径build / logs / spring-integration-example.log下。当前测试由于网关从未收到响应而挂起。此外,整个流程仍在进行中,因此组仅在超时后释放。

我正在使用spring集成来处理文件。我想逐行读取文件,过滤,转换并发送到kafka。通过异步网关传递文件进行处理。在处理结束时,I'...

java spring spring-integration spring-integration-dsl
2个回答
0
投票

我已经创建了自定义消息组


0
投票

我设法解决了这个问题。我必须使用方法MessageHeaders.REPLY_CHANNEL手动将汇总消息发送到BaseIntegrationFlowDefinition#logAndReply。修订以及完成的示例可通过commit eefd5f472891ded39f363ff71ec530ada800f704

© www.soinside.com 2019 - 2024. All rights reserved.