美好的一天。 我正在尝试使用 ReplyingKafkaTemplate 实现同步请求回复消息流。作为个人 POC,它运行良好。但我的应用程序已经实现了 kafka 来手动使用消息。我正在使用 ConcurrentMessageListenerContainer,如下所示。
@Bean
public ConcurrentMessageListenerContainer<String, String> container(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> repliesContainer =
containerFactory.createContainer("kReplies");
repliesContainer.getContainerProperties().setGroupId("repliesGroup");
repliesContainer.setAutoStartup(false);
return repliesContainer;
}
使用 KafkaTemplate 的生产者
ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
System.out.println("Sent ok: " + sendResult.getRecordMetadata());
ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
System.out.println("Return value: " + consumerRecord.value());
App 有自定义 MessageListener 来消费实现 AcknowledgingMessageListener 的消息。 公共类 KafkaListener 实现
public class KafkaListener implements AcknowledgingMessageListener {
...
@Override
public void onMessage(ConsumerRecord consumeRecord, Acknowledgment acknowledgment) {
}
}
上述实现对于发布和消费消息来说效果很好。但现在我正在尝试扩展它以支持使用 ReplyingKafkaTemplate 同步 Kafka 消息流。我将 ReplyingKafkaTemplate 添加到这个框架中。它发布消息来请求主题和侦听器消费消息。
@Bean
public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
ProducerFactory<String, String> pf,
ConcurrentMessageListenerContainer<String, String> repliesContainer) {
return new ReplyingKafkaTemplate<>(pf, repliesContainer);
}
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory()
throws IOException {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new
ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(2);
factory.setStatefulRetry(true);
**factory.setReplyTemplate(kafkaTemplate);** // to support REPLY_TO topic for req/reply
pattern
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
return factory;
}
但是回复并未发布到回复主题。我遵循了 Spring Kafka 文档。我正在尝试在没有@KafkaListener的情况下实现。我想发布回复消息来回复主题,而不使用@KafkaListener。你能帮我解决这个问题吗?
我正在寻找一种解决方案来发送回复主题的响应,而不使用@KafkaListener。我尝试了通过网络提供的不同解决方案,但不知何故,响应消息没有发布到回复主题,这导致每次都出现 ReplyTime out。
如果您不使用
@KafkaListener
,那么您只需直接在代码中使用 KafkaTemplate
即可生成该回复。这本质上就是框架内部为 @KafkaListener
中的 MessagingMessageListenerAdapter
所做的事情。
那个
ConcurrentKafkaListenerContainerFactory
是专门为 @KafkaListener
设计的。如果您手动使用 AcknowledgingMessageListener
,那么您就需要自己实现一切。