Kafka消费者健康检查

问题描述 投票:0回答:4

有没有一种简单的方法来判断消费者(使用 Spring Boot 和 @KafkaListener 创建)是否正常运行? 这包括 - 可以访问和轮询代理、至少分配一个分区等。

我看到有多种方法可以订阅不同的生命周期事件,但这似乎是一个非常脆弱的解决方案。

提前致谢!

spring-kafka
4个回答
5
投票

您可以使用

AdminClient
获取当前群组状态...

@SpringBootApplication
public class So56134056Application {

    public static void main(String[] args) {
        SpringApplication.run(So56134056Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so56134056", 1, (short) 1);
    }

    @KafkaListener(id = "so56134056", topics = "so56134056")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public ApplicationRunner runner(KafkaAdmin admin) {
        return args -> {
            try (AdminClient client = AdminClient.create(admin.getConfig())) {
                while (true) {
                    Map<String, ConsumerGroupDescription> map =
                            client.describeConsumerGroups(Collections.singletonList("so56134056")).all().get(10, TimeUnit.SECONDS);
                    System.out.println(map);
                    System.in.read();
                }
            }
        };
    }

}

{so56134056=(groupId=so56134056, isSimpleConsumerGroup=false, members=(memberId=consumer-2-32a80e0a-2b8d-4519-b71d-671117e7eaf8, clientId=consumer-2, host=/127.0.0.1, assignment=(topicPartitions=so56134056-0)), partitionAssignor=range, state=Stable, coordinator=localhost:9092 (id: 0 rack: null))}

我们一直在考虑将

getLastPollTime()
暴露给侦听器容器 API。

getAssignedPartitions()
自2.1.3起可用。


1
投票

我知道您在帖子中没有提到这一点 - 但如果您随后在 AWS 中部署并在 ELB 扩展环境中使用此类运行状况检查,请注意将此类项目添加到运行状况检查中。

例如,可能发生的一种情况是您的应用程序失去了与 Kafka 的连接 - 您的运行状况检查变为红色 - 然后 Elastic beanstalk 开始杀死并重新启动您的实例的过程(这将不断发生,直到您的 Kafka 实例再次可用) )。这可能会很昂贵!

还有一个更普遍的哲学问题,即健康检查是否应该“级联故障”,例如kafka 已关闭,因此连接到 kafka 的应用程序声称它已关闭,链中的下一个应用程序也会执行相同的操作,等等。这通常更通常通过断路器来实现,断路器旨在最大程度地减少注定失败的缓慢调用。


0
投票

您可以使用AdminClient查看主题描述。

final AdminClient client = AdminClient.create(kafkaConsumerFactory.getConfigurationProperties());

final String topic = "someTopicName";

final DescribeTopicsResult describeTopicsResult = client.describeTopics(Collections.singleton(topic));
final KafkaFuture<TopicDescription> future = describeTopicsResult.values().get(topic);
                    
try {
  // for healthcheck purposes we're fetching the topic description
  future.get(10, TimeUnit.SECONDS);
} catch (final InterruptedException | ExecutionException | TimeoutException e) {
  throw new RuntimeException("Failed to retrieve topic description for topic: " + topic, e);
}

0
投票

我已使用上次轮询秒前指标进行健康检查,并在我的应用程序中成功实现了它。请检查此样本以供参考。

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.TopicPartition;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.boot.actuate.health.Status;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Component;

import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;

@Slf4j
@Component
@RequiredArgsConstructor
public class ConsumerHealthIndicator implements HealthIndicator {

private final KafkaListenerEndpointRegistry     kafkaListenerEndpointRegistry;
private final AppPropertyConfig appPropertyConfig;

@Override
public Health health() {
    final Collection<MessageListenerContainer> allListenerContainers = kafkaListenerEndpointRegistry.getAllListenerContainers();

    Optional<Double> pollInterval = allListenerContainers.stream()
            .map(MessageListenerContainer::metrics)
            .map(Map::values)
            .flatMap(Collection::stream)
            .map(Map::entrySet)
            .flatMap(Collection::stream)
            .filter(entry -> entry.getKey().name().equalsIgnoreCase("last-poll-seconds-ago"))
            .map(Map.Entry::getValue)
            .map(metric -> (Double) metric.metricValue())
            .filter(value -> value > (Double.parseDouble(appPropertyConfig.getKafkaMaxPollInterval()) / 1000))
            .findFirst();

    pollInterval.ifPresent(value -> log.info("Delay in poll {} sec", value));

    Map<String, ConsumerDetails> consumersHealth = allListenerContainers.stream()
            .map(this::buildConsumerDetails)
            .collect(Collectors.toMap(ConsumerDetails::getName, Function.identity()));

    return Health.status(pollInterval.isEmpty() ? Status.UP : Status.DOWN)
            .withDetails(consumersHealth)
            .build();
}

private ConsumerDetails buildConsumerDetails(MessageListenerContainer messageListenerContainer) {
    String partitions = messageListenerContainer.getAssignedPartitions().stream()
            .map(TopicPartition::partition)
            .map(Object::toString)
            .collect(Collectors.joining(","));

    return ConsumerDetails.builder()
            .name(messageListenerContainer.getGroupId())
            .partitions(partitions)
            .running(messageListenerContainer.isRunning())
            .build();
}

@Getter
@Builder
private static class ConsumerDetails {
    String name;
    String partitions;
    boolean running;
}
}
© www.soinside.com 2019 - 2024. All rights reserved.