如何使用java获取kafka延迟

问题描述 投票:0回答:3

我目前开发了一个代码,可以显示主题、分区和日志偏移量。但我目前陷入如何获得分区滞后的问题上。我知道有一个 kafka offset 命令可以执行此功能,但我需要的是 java 代码。

public static void main(String[] args) throws Exception {
    System.out.println("START CONSUMER");final Properties props = new Properties();
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUPID);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

    // Create the consumer using props.
    final Consumer<Long, String> consumer =  new KafkaConsumer<>(props);

    // Subscribe to the topic.
    int i = 0;
    ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
    for (i=0;i<consumer.partitionsFor(TOPIC).size();i++)
    {
        TopicPartition partitiontemp = new TopicPartition(TOPIC, i);
        partitions.add(partitiontemp);
    }
    consumer.assign(partitions);
    consumer.seekToEnd(partitions);

    for (i=0;i<consumer.partitionsFor(TOPIC).size();i++)
    {
        System.out.printf("Topic: %s partitionID: %d log offset: %d \n", TOPIC, i, consumer.position(partitions.get(i)));
    }

    System.out.printf("CREATE CONSUMER DONE");
    consumer.close();

This is the output of my code

我需要做的是输出主题、分区、当前偏移、日志偏移和滞后。如何获取代码的滞后或如何获取代码的当前偏移量。 (参见图片了解所需的输出)。

Needed output

注意:我无法使用(foreach record)功能,因为我不能读取输入文件中的每条记录。

java apache-kafka kafka-consumer-api
3个回答
6
投票

要重现

kafka-consumer-groups
功能,您需要 Consumer 和 AdminClient 实例。

首先,使用 AdminClient,您可以调用

listConsumerGroupOffsets()
来检索主题分区列表以及特定组的已提交偏移量。

然后使用 Consumer 获取这些分区的结束偏移量。您使用的方法效率低下,不需要分配和查找结束偏移量。您只需拨打

endOffsets()
即可。

这足以重现屏幕截图中包含的数据。

kafka-consumer-groups
还使用
AdminClient.describeConsumerGroups()
打印分配给每个分区的组成员(如果有)。


1
投票

您可以通过从消费者那里获取 EndOffset 来获得 LAG

Set<TopicPartition> partitionSet = consumer.assignment();
Map<TopicPartition, Long> endOffsets =consumer.endOffsets(consumer.assignment());

然后迭代哪里超过设置

for(TopicPartition tp : partitionSet) { LOG.info("Topic :: {} ,EndOffset :: {}, currentOffset {}",tp.topic(),beginningOffsets.get(tp),endOffsets.get(tp), consumer.position(tp)); }

consumer.position(tp) -- 将获取当前偏移量,从 endoffset 中减去该偏移量,得到 LAG


0
投票

私有静态长calculateTopicLag(属性属性,字符串主题,字符串消费者组){ 尝试(AdminClient adminClient = AdminClient.create(属性)){ ListConsumerGroupOffsetsResult groupOffsetsResult = adminClient.listConsumerGroupOffsets(consumerGroup); Map offsets = groupOffsetsResult.partitionsToOffsetAndMetadata().get();

        long topicLag = 0;
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
            TopicPartition topicPartition = entry.getKey();
            long consumerOffset = entry.getValue().offset();

            KafkaConsumer<?, ?> consumer = new KafkaConsumer<>(properties);
            consumer.assign(Collections.singletonList(topicPartition));
            long endOffset = consumer.endOffsets(Collections.singletonList(topicPartition)).get(topicPartition);
            consumer.close();

            long partitionLag = endOffset - consumerOffset;
            topicLag += partitionLag;
        }

        return topicLag;
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
        return -1;
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.