我正在使用 Kafkajs 向 Spring Boot 应用程序生成消息,该应用程序实现为回复模板
@KafkaListener(topics = KHQRTopic.GENERATE_KHQR, groupId = KafkaConfiguration.CONSUMER_GROUP)
@SendTo
public Message<?> onEventGenerateKHQR(ConsumerRecord<String, KHQRRequest> consumerRecord) {
log.info("Received event {}", consumerRecord.value());
...
}
在我的node.js 上,我从 Kafka 得到了这样的结果
export const getResult = async <T>(messageOption: MessageOption, data: any) => {
const processID = new Date().getTime().toString()
let payload: string | Buffer = ''
if (messageOption.avroSchemaName) {
payload = await registry.encode(
getSchemaId(messageOption.avroSchemaName),
data
);
} else {
payload = JSON.stringify(data)
}
const replyTopic = messageOption.replyTopic ?? `${messageOption.sendTopic}.reply`
console.log('wait result form', replyTopic)
await producer.send({
topic: messageOption.sendTopic,
messages: [
{ key: processID, value: payload,
headers: {
'reply-topic': replyTopic
} }],
});
return await getMessageResponse<T>(
replyTopic,
processID,
messageOption.timer
);
}
发送消息时,我在 Spring 应用程序的消费者上遇到错误
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2946) ~[spring-kafka-3.0.12.jar:3.0.12]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2887) ~[spring-kafka-3.0.12.jar:3.0.12]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2854) ~[spring-kafka-3.0.12.jar:3.0.12]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$57(KafkaMessageListenerContainer.java:2772) ~[spring-kafka-3.0.12.jar:3.0.12]
at io.micrometer.observation.Observation.lambda$observe$4(Observation.java:544) ~[micrometer-observation-1.11.5.jar:1.11.5]
at io.micrometer.observation.Observation.observeWithContext(Observation.java:603) ~[micrometer-observation-1.11.5.jar:1.11.5]
at io.micrometer.observation.Observation.observe(Observation.java:544) ~[micrometer-observation-1.11.5.jar:1.11.5]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2770) ~[spring-kafka-3.0.12.jar:3.0.12]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2622) ~[spring-kafka-3.0.12.jar:3.0.12]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2508) ~[spring-kafka-3.0.12.jar:3.0.12]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2150) ~[spring-kafka-3.0.12.jar:3.0.12]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1505) ~[spring-kafka-3.0.12.jar:3.0.12]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1469) ~[spring-kafka-3.0.12.jar:3.0.12]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1344) ~[spring-kafka-3.0.12.jar:3.0.12]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: java.lang.IllegalStateException: With no topic header, a defaultTopic is required
标题似乎不匹配。在 Spring 上使用回复模板时,回复主题接受哪些标头?
消息的有效负载应该是什么才能在这两个应用程序之间进行通信?
参见文档:https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/annotation-send-to.html
(无属性):这被视为@SendTo
!{source.headers['kafka_replyTopic']}
因此,包含回复主题的标头名称必须是
kafka_replyTopic
,而不是您在 JS 代码上使用 'reply-topic': replyTopic
所做的事情。