从Kafka获取回复消息

问题描述 投票:0回答:1

我正在使用 Kafkajs 向 Spring Boot 应用程序生成消息,该应用程序实现为回复模板

@KafkaListener(topics = KHQRTopic.GENERATE_KHQR, groupId = KafkaConfiguration.CONSUMER_GROUP)
@SendTo
public Message<?> onEventGenerateKHQR(ConsumerRecord<String, KHQRRequest> consumerRecord) {
        log.info("Received event {}", consumerRecord.value());
...
}

在我的node.js 上,我从 Kafka 得到了这样的结果

export const getResult = async <T>(messageOption: MessageOption, data: any) => {
  const processID = new Date().getTime().toString()
  let payload: string | Buffer = ''
  
    if (messageOption.avroSchemaName) {
      payload = await registry.encode(
        getSchemaId(messageOption.avroSchemaName),
        data
      );
    } else {
      payload = JSON.stringify(data)
    }
    
  
    const replyTopic = messageOption.replyTopic ?? `${messageOption.sendTopic}.reply`
    console.log('wait result form', replyTopic)
    await producer.send({
      topic: messageOption.sendTopic,
      messages: [
        { key: processID, value: payload,
          headers: {
          'reply-topic': replyTopic
      } }],
      
    });

    return await getMessageResponse<T>(
      replyTopic,
      processID,
      messageOption.timer
    );
}

发送消息时,我在 Spring 应用程序的消费者上遇到错误


org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2946) ~[spring-kafka-3.0.12.jar:3.0.12]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2887) ~[spring-kafka-3.0.12.jar:3.0.12]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2854) ~[spring-kafka-3.0.12.jar:3.0.12]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$57(KafkaMessageListenerContainer.java:2772) ~[spring-kafka-3.0.12.jar:3.0.12]
    at io.micrometer.observation.Observation.lambda$observe$4(Observation.java:544) ~[micrometer-observation-1.11.5.jar:1.11.5]
    at io.micrometer.observation.Observation.observeWithContext(Observation.java:603) ~[micrometer-observation-1.11.5.jar:1.11.5]
    at io.micrometer.observation.Observation.observe(Observation.java:544) ~[micrometer-observation-1.11.5.jar:1.11.5]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2770) ~[spring-kafka-3.0.12.jar:3.0.12]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2622) ~[spring-kafka-3.0.12.jar:3.0.12]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2508) ~[spring-kafka-3.0.12.jar:3.0.12]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2150) ~[spring-kafka-3.0.12.jar:3.0.12]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1505) ~[spring-kafka-3.0.12.jar:3.0.12]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1469) ~[spring-kafka-3.0.12.jar:3.0.12]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1344) ~[spring-kafka-3.0.12.jar:3.0.12]
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: java.lang.IllegalStateException: With no topic header, a defaultTopic is required

标题似乎不匹配。在 Spring 上使用回复模板时,回复主题接受哪些标头?

消息的有效负载应该是什么才能在这两个应用程序之间进行通信?

node.js spring-cloud spring-kafka
1个回答
0
投票

参见文档:https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/annotation-send-to.html

@SendTo
(无属性):这被视为
!{source.headers['kafka_replyTopic']}

因此,包含回复主题的标头名称必须是

kafka_replyTopic
,而不是您在 JS 代码上使用
'reply-topic': replyTopic
所做的事情。

最新问题
© www.soinside.com 2019 - 2024. All rights reserved.