我正在使用spring集成来处理文件。我想逐行读取文件,过滤,转换并发送到kafka。通过异步网关传递文件进行处理。在处理结束时,我出于以下两个目的添加了聚合器:
到目前为止,我已经创建了一个聚合器,等待文件标记START和END +所有已处理的行。我已经创建了自定义消息组,该组计算已处理的行,还存储了从文件START标记消息中获取的replyChannel
和从文件END标记消息中获取的行数。该组还设置了5秒超时。组完成后,它会发出带有所需统计信息的消息,并带有从START标记获取的值的replyChannel
标头。我的理解是网关正在等待对此replyChannel
的响应。这是消息组的代码:
public void add(Message<?> messageToAdd) {
if(messageToAdd.getPayload() instanceof FileMarker) {
FileMarker marker = (FileMarker) messageToAdd.getPayload();
switch (marker.getMark()) {
case START:
replyChannel = messageToAdd.getHeaders().get(MessageHeaders.REPLY_CHANNEL);
break;
case END:
expectedRowCount.set(marker.getLineCount());
break;
default:
throw new IllegalStateException("Unexpected file mark");
}
} else {
ProcessingResult processingResult = messageToAdd.getHeaders()
.get(ProcessingConstants.PROCESSING_RESULT_HEADER, ProcessingResult.class);
assert processingResult != null;
rowCount.incrementAndGet();
switch (processingResult) {
case FILTERED:
filteredCount.incrementAndGet();
break;
case PROCESSED:
processedCount.incrementAndGet();
break;
case PROCESSING_ERROR:
processingErrorCount.incrementAndGet();
break;
case KAFKA_ERROR:
kafkaErrorCount.incrementAndGet();
break;
default:
throw new IllegalStateException("Unrecognized processing result: " + processingResult);
}
}
}
我的问题是我的网关永远不会收到响应,并且会无限期地等待。 由于网关处理,如何使异步网关等待聚合器发出其消息并接收此消息的有效载荷?
可以在此处找到重新创建问题的测试,https://github.com/hawk1234/spring-integration-example提交9f121f0729d8076872e6fbdcd7b1b91ca9ea8cb4
。运行测试时,应用程序日志位于路径build / logs / spring-integration-example.log下。当前测试由于网关从未收到响应而挂起。此外,整个流程仍在进行中,因此组仅在超时后释放。我正在使用spring集成来处理文件。我想逐行读取文件,过滤,转换并发送到kafka。通过异步网关传递文件进行处理。在处理结束时,I'...
我已经创建了自定义消息组
我设法解决了这个问题。我必须使用方法MessageHeaders.REPLY_CHANNEL
手动将汇总消息发送到BaseIntegrationFlowDefinition#logAndReply
。修订以及完成的示例可通过commit eefd5f472891ded39f363ff71ec530ada800f704