如果我不关闭kafka生产者,会发生什么

问题描述 投票:6回答:1

我正在处理xml,我需要为每条记录发送一条消息,当我收到最后一条记录时,我关闭了kafka生产者,这里的问题是kafka生产者的send方法是异步的,因此,有时当我关闭时它拖出的生产者java.lang.IllegalStateException: Cannot send after the producer is closed.我读过某个地方,可以让生产者保持打开状态。我的问题是:这意味着什么,或者是否有更好的解决方案。

---编辑---

<list>
  <element attr1="" att2="" attr3=""/>
  <element attr1="" att2="" attr3=""/>
  <element attr1="" att2="" attr3=""/>
  <element attr1="" att2="" attr3=""/>
  <element attr1="" att2="" attr3=""/>
  <element attr1="" att2="" attr3=""/>
  <element attr1="" att2="" attr3=""/>
  <element attr1="" att2="" attr3=""/>
...
</list>

想象以下情况:

  • 我们阅读标签并创建kafka生产者
  • 每个元素我们读取其属性,生成一个json对象,然后使用send方法将其发送到kafka。-当我们读取元素时,我们在生产者中调用close方法

问题的数量因此可能是80k,有时,当我们调用断开连接方法时,它会继续以异步方式发送消息。因此,我们需要先调用flush方法,但这会影响性能

java apache-kafka kafka-producer-api
1个回答
6
投票

您应在致电Producer.flush()之前先致电Producer.close()。这是一个阻塞的调用,不会在发送所有记录之前返回。

最新问题
© www.soinside.com 2019 - 2024. All rights reserved.