我正在编写一个通用适配器,该适配器从EMS获取消息并推送到Kafka主题中。如果配置文件中将特定标志设置为true,则必须丰富标题。标头的表达也将存在于配置文件中
return IntegrationFlows.from(org.springframework.integration.jms.dsl.Jms
.messageDrivenChannelAdapter(org.springframework.integration.jms.dsl.Jms
.container(this.queueConnFactory, this.queue)
.transactionManager(transactionManager()).get()))
.enrichHeaders(
h -> h.headerExpression(KafkaHeaders.MESSAGE_KEY, "headers['flightNbr']"))
.handle(Kafka.outboundChannelAdapter(kafkaTemplate).topic(kafkaTopic))
.get();
我知道如何从收到的消息中添加标题,但是我们如何有条件地添加标题?同样,如果该标志为假,将不应用标头。
有一个headerFunction(String name, Function<Message<P>, ?> function)
而不是那个headerExpression()
。因此,类似这样的方法应该对您有用:
.enrichHeaders(h ->
h.headerFunction(KafkaHeaders.MESSAGE_KEY, m -> {
if (SOME_STATE) {
return m.getHeaders().get("flightNbr");
}
else {
return null;
}
}))