如何制作异步Kafka消息

问题描述 投票:0回答:1

我想从 App1 发送一个对象到 App2 更新它然后将其返回到 App1。

当前对象没有更新,App1 打印它最初在回调函数中发送的对象。我需要App1先等待App2处理数据。

应用1:

@Autowired
KafkaTemplate<String, ParsingInfo> kafkaTemplate;
...

System.out.println("sending message to App2");
CompletableFuture<SendResult<String, ParsingInfo>> future = kafkaTemplate.send(Properties.TopicNames.PARSE, parsingInfo);
// create listener and wait for response for CompletableFuture
future.whenCompleteAsync((result, ex) -> 
{
    System.out.println("get results");
    if (ex != null) 
    {
        ex.printStackTrace();
        System.out.println("Error sending message: " + ex.getMessage());
    } else {
        ParsingInfo newInfo = new ParsingInfo();
        newInfo.copyFrom(result.getProducerRecord().value());
        System.out.println("success-getProducerRecord 0: " + newInfo.getSopSaveData());
    }
});

应用2:

KafkaListener(topics = KafkaProperties.TopicNames.PARSE, groupId = KafkaProperties.ConsumerGroupIds.SMART_INTERPRETER)
public CompletableFuture<SendResult<String, ParsingInfo>> listen(ParsingInfo parsingInfo) 
{
    System.out.println("Received Message: " + parsingInfo.getSopSaveData());
    CompletableFuture<SendResult<String, ParsingInfo>> future = new CompletableFuture<>();
    try 
    {
        parsingInfo.setSopSaveData("test");
        System.out.println("Sending Message: " + parsingInfo.getSopSaveData());
        future.complete(new SendResult<String, ParsingInfo>(new ProducerRecord<String, ParsingInfo>(KafkaProperties.TopicNames.PARSE, parsingInfo), null));
        return future;
    } 
    catch (Exception e) 
    {
        future.completeExceptionally(e);
        return future;
    }
}

我已将 @EnableAsync 添加到生产者和消费者,但没有运气。这种逻辑可能吗?

java spring-kafka
1个回答
0
投票

看来您需要回到理论并重新评估消息传递中的

producer
consumer
以及经纪人如何参与其中。 无论您使用什么消息系统,行为都是一样的。

因此,当我们将数据发布到代理时,我们会说将该数据放入哪个目的地。当数据成功到达目的地时,生产者会收到确认。就是这样:制作人的工作已经完成。该目的地的消费者可能根本不存在。这是消息传递的一般目的:以可靠的方式区分数据生成和数据消费。

不可靠近地面。根据您的

@KafkaListener
,您确实希望生成一些数据作为回复。因此,请考虑使用
ReplyingKafkaTemplate
代替:https://docs.spring.io/spring-kafka/reference/kafka/sending-messages.html#replying-template

© www.soinside.com 2019 - 2024. All rights reserved.