我需要流式传输文件并逐行处理行。 在此过程中,我需要跟踪成功处理的生产线和失败的处理线,并为每个拆分发布这些指标。
Camel split 将在每个 split 上创建新的交换,因此标头将丢失,并且聚合在这里不起作用。因为结果将在拆分结束时访问。
如何使用camel解决这个问题?
示例路线:
onException(Exception.class)
.handled(true)
.process(new FailureProcessor()) // increment failureCount
.to("{{publishMetrics}}");
from("{{file}}")
.setHeader("successCount", constant(0))
.setHeader("failureCount", constant(0))
.split(body().tokenize("\n"))
.process(new MyProcessor()) // Process the lines
.process(new SuccessProcessor()) // Increment the successCount header
.to("{{publishMetrics}}") // Publish the metrics(successCount & failureCount)
.end()
.end(); // split ends
您可以使用自定义聚合策略将拆分结果合并为一条摘要消息,然后您可以发布该消息。
参见 https://camel.apache.org/components/4.4.x/eips/split-eip.html#_aggregating