我可以忽略org.apache.kafka.common.errors.NotLeaderForPartitionExceptions吗?

问题描述 投票:0回答:2

我的 Apache Kafka 生产者 (0.9.0.1) 间歇性地抛出一个

org.apache.kafka.common.errors.NotLeaderForPartitionException

我执行 Kafka 发送的代码类似于此

final Future<RecordMetadata> futureRecordMetadata = KAFKA_PRODUCER.send(new ProducerRecord<String, String>(kafkaTopic, UUID.randomUUID().toString(), jsonMessage));

try {
    futureRecordMetadata.get();
} catch (final InterruptedException interruptedException) {
    interruptedException.printStackTrace();
    throw new RuntimeException("sendKafkaMessage(): Failed due to InterruptedException(): " + sourceTableName + " " + interruptedException.getMessage());
} catch (final ExecutionException executionException) {
    executionException.printStackTrace();
    throw new RuntimeException("sendKafkaMessage(): Failed due to ExecutionException(): " + sourceTableName + " " + executionException.getMessage());
}

我在

NotLeaderForPartitionException
块内抓住了
catch (final ExecutionException executionException) {}

可以忽略这个特定的异常吗?

我的Kafka消息发送成功了吗?

exception apache-kafka kafka-producer-api
2个回答
25
投票

如果您收到

NotLeaderForPartitionException
,则您的数据写入成功。

每个主题分区由一个或多个 Broker 存储(其中一个领导者;其余代理称为追随者),具体取决于您的复制因子。生产者需要向领导者 Broker 发送新消息(到追随者的数据复制发生在内部)。

您的生产者客户端未连接到正确的 Broker,即连接到跟随者而不是领导者(或者连接到不再是跟随者的代理),并且该代理拒绝您的发送请求。如果领导者发生了变化,但生产者仍然拥有关于哪个代理是分区领导者的过时的缓存元数据,就会发生这种情况。


0
投票

我在 Kubernetes 中尝试使用 Kafka 集群时遇到了同样的问题。 Kubernetes 集群托管在云实例上。我尝试从我的本地机器运行Kafka生产者代码。在第一次运行期间,发生了

UnknownHostException
。我通过将 Kafka Kubernetes 服务地址添加到我的
/etc/hosts
文件中解决了这个问题。

示例:

10.160.160.60    kafka-controller-0.kafka-controller-headless.default.svc.cluster.local:9092,kafka-controller-1.kafka-controller-headless.default.svc.cluster.local:9092,kafka-controller-2.kafka-controller-headless.default.svc.cluster.local:9092

然而,我随后遇到了

NotLeaderOrFollowerException
。尽管尝试了多种解决方案,但似乎没有一个能正常工作。有时,重新启动本地应用程序可以暂时解决问题。但后续重启后问题又会出现。

最终,我将本地应用程序移至 Kafka 集群中,从而解决了问题。出现此问题的原因是我的

/etc/hosts
文件中的所有 Kafka 代理地址都指向同一 IP 地址。当生产者尝试连接 Kafka 主题领导者时,Kubernetes Proxy 将请求路由到随机 Kafka 节点,从而导致此问题。

我开始在本地 Docker 上使用单节点 Kafka。避免尝试连接到远程服务器上的 Kafka 集群。如有必要,您应该为每个 Kafka 节点开放对不同 IP:Port 组合的公共访问,并在本地应用程序配置中指定所有代理的列表。

© www.soinside.com 2019 - 2024. All rights reserved.