我想从 App1 发送一个对象到 App2 更新它然后将其返回到 App1。
当前对象没有更新,App1 打印它最初在回调函数中发送的对象。我需要App1先等待App2处理数据。
应用1:
@Autowired
KafkaTemplate<String, ParsingInfo> kafkaTemplate;
...
System.out.println("sending message to App2");
CompletableFuture<SendResult<String, ParsingInfo>> future = kafkaTemplate.send(Properties.TopicNames.PARSE, parsingInfo);
// create listener and wait for response for CompletableFuture
future.whenCompleteAsync((result, ex) ->
{
System.out.println("get results");
if (ex != null)
{
ex.printStackTrace();
System.out.println("Error sending message: " + ex.getMessage());
} else {
ParsingInfo newInfo = new ParsingInfo();
newInfo.copyFrom(result.getProducerRecord().value());
System.out.println("success-getProducerRecord 0: " + newInfo.getSopSaveData());
}
});
应用2:
KafkaListener(topics = KafkaProperties.TopicNames.PARSE, groupId = KafkaProperties.ConsumerGroupIds.SMART_INTERPRETER)
public CompletableFuture<SendResult<String, ParsingInfo>> listen(ParsingInfo parsingInfo)
{
System.out.println("Received Message: " + parsingInfo.getSopSaveData());
CompletableFuture<SendResult<String, ParsingInfo>> future = new CompletableFuture<>();
try
{
parsingInfo.setSopSaveData("test");
System.out.println("Sending Message: " + parsingInfo.getSopSaveData());
future.complete(new SendResult<String, ParsingInfo>(new ProducerRecord<String, ParsingInfo>(KafkaProperties.TopicNames.PARSE, parsingInfo), null));
return future;
}
catch (Exception e)
{
future.completeExceptionally(e);
return future;
}
}
我已将 @EnableAsync 添加到生产者和消费者,但没有运气。这种逻辑可能吗?
看来您需要回到理论并重新评估消息传递中的
producer
和consumer
以及经纪人如何参与其中。
无论您使用什么消息系统,行为都是一样的。
因此,当我们将数据发布到代理时,我们会说将该数据放入哪个目的地。当数据成功到达目的地时,生产者会收到确认。就是这样:制作人的工作已经完成。该目的地的消费者可能根本不存在。这是消息传递的一般目的:以可靠的方式区分数据生成和数据消费。
不可靠近地面。根据您的
@KafkaListener
,您确实希望生成一些数据作为回复。因此,请考虑使用 ReplyingKafkaTemplate
代替:https://docs.spring.io/spring-kafka/reference/kafka/sending-messages.html#replying-template