如何正确处理Spring Kafka Producer发送消息失败?

问题描述 投票:0回答:1

我有一个 Spring Kafka 应用程序,它接收 HTTP 请求并将其有效负载发送到 Kafka 主题。我想预见以下不成功的情况:

  1. 最初,应用程序正常工作:它接收 HTTP 请求并在 Kafka 中发布数据。
  2. 在某个时刻,由于任何原因,发送到 Kafka 变得不可能。
  3. 应用程序收到新的 HTTP 请求,尝试在 Kafka 中发送数据,但失败。

在这种情况下,应用程序无法无限重试在 Kafka 中发布,因为它应该返回 HTTP 响应:无论成功与否。

我做了什么

考虑到这一点,我为生产者配置了

delivery.timeout.ms
属性。并以这种方式实现发送到Kafka:

  ... 
  @Value("${spring.kafka.producer.properties[delivery.timeout.ms]}")
  private Long deliveryTimeoutMs;

  public void sendMessage(String data) throws InterruptedException {
    try {
      kafkaTemplate.send(topic, data).get(deliveryTimeoutMs, MILLISECONDS);

    } catch (CancellationException | ExecutionException | TimeoutException e) {
      String msg = String.format(UNABLE_TO_PUBLISH_DATA_IN_KAFKA, topic, data);
      log.error(msg, e);
      throw new KafkaProducerServiceException(msg, e);

    } catch (InterruptedException e) {
      String msg = String.format(UNABLE_TO_PUBLISH_DATA_IN_KAFKA, topic, data);
      log.error(msg, e);
      throw e;
    }
  }

因此,应用程序尝试在 Kafka 中发送数据,等待

delivery.timeout.ms
属性中指定的相同时间段,如果失败,它将返回 500 HTTP 响应。

问题 当我尝试执行以下操作时:

  1. 运行我的应用程序并在 Docker 中运行 Kafka。
  2. 最初进行一些成功的发布。
  3. 暂停 Kafka 容器。
  4. 向应用程序发送另一个 HTTP 请求(并在
    delivery.timeout.ms
    后收到 500 响应)
  5. 再次运行 Kafka 容器。

然后我看到 来自失败的 HTTP 请求的数据无论如何都已在 Kafka 中发布了 – 即使在

delivery.timeout.ms
之后。甚至在更长的时间(比如 30 分钟)之后。但发送方系统已经收到500响应。如果重试此失败的请求,数据将在 Kafka 主题中重复。我怎样才能避免这种情况?

附注如果您发现我的思路有任何错误,请告诉我。

谢谢!

java http apache-kafka spring-kafka kafka-producer-api
1个回答
0
投票

如果你低于spring kafka 3.0 下面是一个可能的解决方案 希望这有帮助

                var future = template.send(producerRecordSuccess).completable();
                future.whenComplete((result,ex) -> {
                   if (ex == null){
                       //return 200
                   }
                   else if(ex != null){
                       //return 503
                   }
                });
© www.soinside.com 2019 - 2024. All rights reserved.