Apache Kafka 生产者配置:'request.timeout.ms' VS。 “max.block.ms”属性

问题描述 投票:0回答:3

鉴于以下同步 kafka 生产者

Properties props = new Properties();
props.put("max.block.ms", 30000);
props.put("request.timeout.ms", 30000);
props.put("retries", 5);

KafkaProducer<String, byte[]> produce = new KafkaProducer<>(props);

//Send message
producer.send(producerRecord).get();

帮助我理解 request.timeout.msmax.block.ms 生产者配置之间的区别。是否包括所有重试的最长时间?或者每次重试都有自己的超时时间?

apache-kafka kafka-producer-api
3个回答
11
投票

request.timeout.ms 用于超时请求,我会将其设置为我可以等待响应的最长时间。

max.block.ms 用于生产者阻塞缓冲时间、序列化时间等。

详情请看这个。 https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient


11
投票

我发现接受的答案有点“薄”,所以这可能会帮助其他人。

您的代码中有两件重要的事情:

KafkaProducer<String, byte[]> produce = new KafkaProducer<>(props);

//Send message
producer.send(producerRecord).get();

producer.send(producerRecord)
- 由两部分组成:阻塞和非阻塞。阻挡部分是由一些“零件”组成的:

- Request metadata about brokers from zookeeper 
- Serialize message
- Choose a Partition 
- Place Message in the RecordAccumulator

现在,通常前三个步骤很快(第一个步骤在初始调用后缓存),而第四个步骤可能需要时间。发生这种情况是因为

RecordAccumulator
的空间有限 (
buffer.memory
),而您目前有多少空间取决于生产者客户端中的 other 线程(称为
Sender Thread
)。如果这个线程做得不好(从
RecordAccumulator
检索消息并将其发送到代理;顺便说一句,所有这些都有指标),您的
send
方法将被阻塞(
RecordAccumulator
中没有空间)直到有可用空间。

所有这 4 个步骤都允许被阻止最多

max.block.ms
。这就是 KIP 在谈论时的含义:

  • 元数据获取时间(从 Zookeeper 获取有关代理的元数据)
  • 缓冲满块时间(我所说的时间)
  • 序列化时间(定制序列化器)
  • 分区时间(定制分区器)

还有

delivery.timeout.ms
。这是消息发送到分区之前等待的总时间,包括:将记录推送到批次的时间(在
RecordAccumulator
中)+ 获取 ack 的时间(例如
all
并等待消息发送)跨副本复制)+ 将消息发送到代理的时间,包括所有重试(如果有)。

您可以将其视为从

send
方法到达所有副本并发回 ack 所需的时间。所有这段时间都必须低于
delivery.timeout.ms


在解释

request.timeout.ms
之前,恕我直言,了解
max.in.flight.requests.per.connection
是什么很重要,因为它们有一点联系。假设一批已准备好从
RecordAccumulator
发送到代理(因为其
batch.size
linger.ms
已完成)。该批次是否由所谓的“发送者线程”(客户端本身的线程,并且是!=调用
send
方法的线程)获取或不由
max.in.flight.requests.per.connection
定义。

您可以在任何时间点有多达

max.in.flight.requests.per.connection
并发 请求处于活动状态。一个稍微容易一点的思考方式是这样的。 “发送者线程”有一个它不断执行的特定循环,以伪代码表示:

while(true) {
    // check if there are batches to send
    // get the batches to send to the brokers
    // make requests to the broker
    // poll connections
    // etc
}

假设这是第一批发送的。 “发送者线程”递增

max.in.flight.requests.per.connection
,使其变为1;获取批次并将其发送给经纪人。此时它“不会”等待确认,而是返回到循环。依此类推,直到达到 5(max.in.flight.requests.per.connection 的默认值)。
现在假设有批次要发送到代理,发送者线程不会接受,因为它没有可用的请求(我们现在最多 5 个)。相反,它会“轮询连接”:它会向代理询问之前发送的结果

,其余的解释在这里

有了所有这些背景,是时候看看
request.timeout.ms

了,现在实际上很容易了。当客户端轮询连接时 - 尝试从代理获取每个正在进行的请求的响应,它可以在

request.timeout.ms
(默认情况下为 30 秒)内完成此操作。如果我们重试,该值将被重置。
    


0
投票

当我尝试向已关闭的代理/服务器发送请求时 (max.block.size = 60000,request.timeout.ms = 1000,delivery.timeout.ms = 2000,重试= 2,retry.backoff.ms = 100)

try { kafkaTemplate.send(producerRecord).get(); }catch (Exception e){ LOGGER.error("Error while sending request: {}", e); }

我在 60 秒后超时 -> max.block.ms

抛出异常:

发送失败;嵌套异常是 org.apache.kafka.common.errors.TimeoutException:60000 毫秒后元数据中不存在主题 DUMMY-TOPIC。

幕后是否发生任何重试?

超时是否应该更早出现?它应该花费不到 7 秒 -> 交付。超时 2 秒 3 次,退避时间仅为 100 毫秒

© www.soinside.com 2019 - 2024. All rights reserved.