生产环境中,尤其是新部署应用程序时,如何处理大量积压的Kafka消息?

问题描述 投票:0回答:2

我在生产环境(在 AWS 上)上新部署了一个 Spring Boot 应用程序,该应用程序使用各种主题的 Kafka 消息,这些主题在部署之前已经生成了数千条消息(约 25K)。

我观察到,我的应用程序中的 Kafka 消费者日志仅在 Spring boot 应用程序启动后才会打印,但是,由于每 3 分钟后 ECS 运行状况检查失败,容器不断重新启动。我们也尝试将健康检查时间增加到15分钟,但仍然不断重启。

INFO  [m - Started Application in 13.364 seconds (JVM running for 14.489)

如果此信息有帮助,下面是我的 Kafka 主题配置。

  • Spring启动版本:v2.6.6
  • 卡夫卡版本:3.0.1
  • org.mybatis.spring.boot:2.2.2

Kafka消费者配置如下:

allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [***.com:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-appmanager-1
client.rack = 
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = appmanager
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 45000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.springframework.kafka.support.serializer.ErrorHandlingDeserializer

KafkaConsumerConfig 文件方法如下:

@Bean
public ConsumerFactory<String, ABCMessage> consumerFactory() {
    Map<String, Object> props = getConsumerProperties(ABCDeserializer.class, FailedABCConfigRequestMessageProvider.class, ABCMessage.class);
    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, ABCMessage> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, ABCMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

我的消费者代码的示例片段如下:

@KafkaListener(topics = "${abc.config.topic.name}", containerFactory = "kafkaListenerContainerFactory")
public void listen(ConsumerRecord<String, ABCMessage> cr) {
    logger.info("Received Message: {}", cr.value().toString());

    try {
        abcConfigProcessor.processMessage(cr.value());
    } catch (Exception e) {
        logger.error("Message cannot be processed. Error: {}", e.getMessage());
    }
}

如何使我的应用程序能够部署在生产中并在后台优雅地吸收积压,并成功响应定期 ECS 健康检查?

spring-boot apache-kafka amazon-ecs spring-kafka
2个回答
0
投票

忽略 ECS,任何新的 KafkaConsumer 都必须进行扩展以最多消耗主题中的分区数量。然后,您需要确保您的处理逻辑不会“太慢”,否则消费者组将重新平衡并导致更多延迟。


0
投票

正如上面评论中所强调的,我们的问题实际上与错误配置 ECS 健康检查有关。这是缺少安全组/防火墙端口不允许。由于 ping API 无法到达应用程序,导致 ECS 健康检查失败并且容器重新启动。

© www.soinside.com 2019 - 2024. All rights reserved.