有没有一种简单的方法来判断消费者(使用 Spring Boot 和 @KafkaListener 创建)是否正常运行? 这包括 - 可以访问和轮询代理、至少分配一个分区等。
我看到有多种方法可以订阅不同的生命周期事件,但这似乎是一个非常脆弱的解决方案。
提前致谢!
您可以使用
AdminClient
获取当前群组状态...
@SpringBootApplication
public class So56134056Application {
public static void main(String[] args) {
SpringApplication.run(So56134056Application.class, args);
}
@Bean
public NewTopic topic() {
return new NewTopic("so56134056", 1, (short) 1);
}
@KafkaListener(id = "so56134056", topics = "so56134056")
public void listen(String in) {
System.out.println(in);
}
@Bean
public ApplicationRunner runner(KafkaAdmin admin) {
return args -> {
try (AdminClient client = AdminClient.create(admin.getConfig())) {
while (true) {
Map<String, ConsumerGroupDescription> map =
client.describeConsumerGroups(Collections.singletonList("so56134056")).all().get(10, TimeUnit.SECONDS);
System.out.println(map);
System.in.read();
}
}
};
}
}
{so56134056=(groupId=so56134056, isSimpleConsumerGroup=false, members=(memberId=consumer-2-32a80e0a-2b8d-4519-b71d-671117e7eaf8, clientId=consumer-2, host=/127.0.0.1, assignment=(topicPartitions=so56134056-0)), partitionAssignor=range, state=Stable, coordinator=localhost:9092 (id: 0 rack: null))}
我们一直在考虑将
getLastPollTime()
暴露给侦听器容器 API。
getAssignedPartitions()
自2.1.3起可用。
我知道您在帖子中没有提到这一点 - 但如果您随后在 AWS 中部署并在 ELB 扩展环境中使用此类运行状况检查,请注意将此类项目添加到运行状况检查中。
例如,可能发生的一种情况是您的应用程序失去了与 Kafka 的连接 - 您的运行状况检查变为红色 - 然后 Elastic beanstalk 开始杀死并重新启动您的实例的过程(这将不断发生,直到您的 Kafka 实例再次可用) )。这可能会很昂贵!
还有一个更普遍的哲学问题,即健康检查是否应该“级联故障”,例如kafka 已关闭,因此连接到 kafka 的应用程序声称它已关闭,链中的下一个应用程序也会执行相同的操作,等等。这通常更通常通过断路器来实现,断路器旨在最大程度地减少注定失败的缓慢调用。
您可以使用AdminClient查看主题描述。
final AdminClient client = AdminClient.create(kafkaConsumerFactory.getConfigurationProperties());
final String topic = "someTopicName";
final DescribeTopicsResult describeTopicsResult = client.describeTopics(Collections.singleton(topic));
final KafkaFuture<TopicDescription> future = describeTopicsResult.values().get(topic);
try {
// for healthcheck purposes we're fetching the topic description
future.get(10, TimeUnit.SECONDS);
} catch (final InterruptedException | ExecutionException | TimeoutException e) {
throw new RuntimeException("Failed to retrieve topic description for topic: " + topic, e);
}
我已使用上次轮询秒前指标进行健康检查,并在我的应用程序中成功实现了它。请检查此样本以供参考。
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.TopicPartition;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.boot.actuate.health.Status;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Component;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
@Slf4j
@Component
@RequiredArgsConstructor
public class ConsumerHealthIndicator implements HealthIndicator {
private final KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
private final AppPropertyConfig appPropertyConfig;
@Override
public Health health() {
final Collection<MessageListenerContainer> allListenerContainers = kafkaListenerEndpointRegistry.getAllListenerContainers();
Optional<Double> pollInterval = allListenerContainers.stream()
.map(MessageListenerContainer::metrics)
.map(Map::values)
.flatMap(Collection::stream)
.map(Map::entrySet)
.flatMap(Collection::stream)
.filter(entry -> entry.getKey().name().equalsIgnoreCase("last-poll-seconds-ago"))
.map(Map.Entry::getValue)
.map(metric -> (Double) metric.metricValue())
.filter(value -> value > (Double.parseDouble(appPropertyConfig.getKafkaMaxPollInterval()) / 1000))
.findFirst();
pollInterval.ifPresent(value -> log.info("Delay in poll {} sec", value));
Map<String, ConsumerDetails> consumersHealth = allListenerContainers.stream()
.map(this::buildConsumerDetails)
.collect(Collectors.toMap(ConsumerDetails::getName, Function.identity()));
return Health.status(pollInterval.isEmpty() ? Status.UP : Status.DOWN)
.withDetails(consumersHealth)
.build();
}
private ConsumerDetails buildConsumerDetails(MessageListenerContainer messageListenerContainer) {
String partitions = messageListenerContainer.getAssignedPartitions().stream()
.map(TopicPartition::partition)
.map(Object::toString)
.collect(Collectors.joining(","));
return ConsumerDetails.builder()
.name(messageListenerContainer.getGroupId())
.partitions(partitions)
.running(messageListenerContainer.isRunning())
.build();
}
@Getter
@Builder
private static class ConsumerDetails {
String name;
String partitions;
boolean running;
}
}