如何实现ReplyingKafkaTemplate而不使用@KafkaListener和@SendTo发送响应到回复主题

问题描述 投票:0回答:1

美好的一天。 我正在尝试使用 ReplyingKafkaTemplate 实现同步请求回复消息流。作为个人 POC,它运行良好。但我的应用程序已经实现了 kafka 来手动使用消息。我正在使用 ConcurrentMessageListenerContainer,如下所示。

    @Bean
    public ConcurrentMessageListenerContainer<String, String> container(
            ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

        ConcurrentMessageListenerContainer<String, String> repliesContainer =
                containerFactory.createContainer("kReplies");
        repliesContainer.getContainerProperties().setGroupId("repliesGroup");
        repliesContainer.setAutoStartup(false);

        return repliesContainer;
    }

使用 KafkaTemplate 的生产者

 ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
 RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
 SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
 System.out.println("Sent ok: " + sendResult.getRecordMetadata());
 ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
 System.out.println("Return value: " + consumerRecord.value());

App 有自定义 MessageListener 来消费实现 AcknowledgingMessageListener 的消息。 公共类 KafkaListener 实现

public class KafkaListener implements AcknowledgingMessageListener {
...
@Override
    public void onMessage(ConsumerRecord consumeRecord, Acknowledgment acknowledgment) {
}
}

上述实现对于发布和消费消息来说效果很好。但现在我正在尝试扩展它以支持使用 ReplyingKafkaTemplate 同步 Kafka 消息流。我将 ReplyingKafkaTemplate 添加到这个框架中。它发布消息来请求主题和侦听器消费消息。

 @Bean
    public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
            ProducerFactory<String, String> pf,
            ConcurrentMessageListenerContainer<String, String> repliesContainer) {

        return new ReplyingKafkaTemplate<>(pf, repliesContainer);
    }


       public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory()           
       throws IOException {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new   
                ConcurrentKafkaListenerContainerFactory<String, String>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(2);
        factory.setStatefulRetry(true);
        **factory.setReplyTemplate(kafkaTemplate);** // to support REPLY_TO topic for req/reply   
                pattern
        factory.getContainerProperties().setAckMode(AckMode.MANUAL);
        return factory;
    }

但是回复并未发布到回复主题。我遵循了 Spring Kafka 文档。我正在尝试在没有@KafkaListener的情况下实现。我想发布回复消息来回复主题,而不使用@KafkaListener。你能帮我解决这个问题吗?

我正在寻找一种解决方案来发送回复主题的响应,而不使用@KafkaListener。我尝试了通过网络提供的不同解决方案,但不知何故,响应消息没有发布到回复主题,这导致每次都出现 ReplyTime out。

apache-kafka spring-kafka kafka-consumer-api
1个回答
0
投票

如果您不使用

@KafkaListener
,那么您只需直接在代码中使用
KafkaTemplate
即可生成该回复。这本质上就是框架内部为
@KafkaListener
中的
MessagingMessageListenerAdapter
所做的事情。

那个

ConcurrentKafkaListenerContainerFactory
是专门为
@KafkaListener
设计的。如果您手动使用
AcknowledgingMessageListener
,那么您就需要自己实现一切。

© www.soinside.com 2019 - 2024. All rights reserved.