Spring Kafka Consumer 即使未提交偏移量也会移动到下一个偏移量

问题描述 投票:0回答:1

我有一个 spring kafka 消费者,它从具有 10 个分区的 kafka 主题读取消息,我只有 1 个正在运行的 kafka 消费者实例。我正在手动提交偏移量并禁用自动提交,我正在记录哪个偏移量消费者正在处理,即使在处理消息时出现异常并且未提交偏移量,消费者也会移动到下一个偏移量。这怎么可能。

Spring Kafka 配置:

@Bean
    public ConsumerFactory<String, String> uploadFileConsumerFactory() {

        Map<String, Object> props = new HashMap<>();
        props.put(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapAddress);
       props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
       props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
       props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000);
       props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
       props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule   " +
               "required username='"+kafkaUsername+"'   password='"+kafkaPassword+"';");
       props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
       props.put("ssl.truststore.location", trustStoreLoc);
       props.put("ssl.truststore.password", trustStorePassword);

       //retry backoff ms
       //Error handler retries

        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new StringDeserializer());
    }

   @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> uploadFileConsumerKafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(uploadFileConsumerFactory());
        factory.setMissingTopicsFatal(false);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.setCommonErrorHandler(new KafkaErrorHandler());
        return factory;
    }

应用程序日志

s3-downloader-v4-1, groupId=episteme-s3-downloader-v4] **Found no committed offset for partition** pp.qmct.s3.file.tracker-0
2024-01-19 16:57:01.717  INFO 22380 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-episteme-s3-downloader-v4-1, groupId=episteme-s3-downloader-v4] Found no committed offset for partition pp.qmct.s3.file.tracker-1
2024-01-19 16:57:01.717  INFO 22380 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-episteme-s3-downloader-v4-1, groupId=episteme-s3-downloader-v4] Found no committed offset for partition pp.qmct.s3.file.tracker-2
2024-01-19 16:57:01.717  INFO 22380 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-episteme-s3-downloader-v4-1, groupId=episteme-s3-downloader-v4] Found no committed offset for partition pp.qmct.s3.file.tracker-3
2024-01-19 16:57:01.717  INFO 22380 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-episteme-s3-downloader-v4-1, groupId=episteme-s3-downloader-v4] Found no committed offset for partition pp.qmct.s3.file.tracker-4
2024-01-19 16:57:01.719  INFO 22380 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-episteme-s3-downloader-v4-1, groupId=episteme-s3-downloader-v4] Found no committed offset for partition pp.qmct.s3.file.tracker-5
2024-01-19 16:57:01.719  INFO 22380 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-episteme-s3-downloader-v4-1, groupId=episteme-s3-downloader-v4] Found no committed offset for partition pp.qmct.s3.file.tracker-6
2024-01-19 16:57:01.719  INFO 22380 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-episteme-s3-downloader-v4-1, groupId=episteme-s3-downloader-v4] Found no committed offset for partition pp.qmct.s3.file.tracker-7
2024-01-19 16:57:01.719  INFO 22380 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-episteme-s3-downloader-v4-1, groupId=episteme-s3-downloader-v4] Found no committed offset for partition pp.qmct.s3.file.tracker-8
2024-01-19 16:57:01.719  INFO 22380 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-episteme-s3-downloader-v4-1, groupId=episteme-s3-downloader-v4] Found no committed offset for partition pp.qmct.s3.file.tracker-9
2024-01-19 16:57:01.896  INFO 22380 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-episteme-s3-downloader-v4-1, groupId=episteme-s3-downloader-v4] Resetting offset for partition pp.qmct.s3.file.tracker-2 to position FetchPosition{offset=684457, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[-11:32401 (id: 1 rack: null)], epoch=74}}.
2024-01-19 16:57:01.896  INFO 22380 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-episteme-s3-downloader-v4-1, groupId=episteme-s3-downloader-v4] Resetting offset for partition pp.qmct.s3.file.tracker-5 to position FetchPosition{offset=674758, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[-11:32401 (id: 1 rack: null)], epoch=74}}.
2024-01-19 16:57:01.898  INFO 22380 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-episteme-s3-downloader-v4-1, groupId=episteme-s3-downloader-v4] Resetting offset for partition pp.qmct.s3.file.tracker-8 to position FetchPosition{offset=687910, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[-11:32401 (id: 1 rack: null)], epoch=74}}.
2024-01-19 16:57:01.909  INFO 22380 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-episteme-s3-downloader-v4-1, groupId=episteme-s3-downloader-v4] Resetting offset for partition pp.qmct.s3.file.tracker-0 to position FetchPosition{offset=684558, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[-10:32400 (id: 0 rack: null)], epoch=70}}.
2024-01-19 16:57:01.909  INFO 22380 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-episteme-s3-downloader-v4-1, groupId=episteme-s3-downloader-v4] Resetting offset for partition pp.qmct.s3.file.tracker-3 to position FetchPosition{offset=680112, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[-10:32400 (id: 0 rack: null)], epoch=69}}.
2024-01-19 16:57:01.909  INFO 22380 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-episteme-s3-downloader-v4-1, groupId=episteme-s3-downloader-v4] Resetting offset for partition pp.qmct.s3.file.tracker-6 to position FetchPosition{offset=691661, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[-10:32400 (id: 0 rack: null)], epoch=70}}.
2024-01-19 16:57:01.909  INFO 22380 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-episteme-s3-downloader-v4-1, groupId=episteme-s3-downloader-v4] Resetting offset for partition pp.qmct.s3.file.tracker-9 to position FetchPosition{offset=702081, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{ (id: 0 rack: null)], epoch=68}}.
2024-01-19 16:57:01.916  INFO 22380 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-episteme-s3-downloader-v4-1, groupId=episteme-s3-downloader-v4] Resetting offset for partition pp.qmct.s3.file.tracker-1 to position FetchPosition{offset=678786, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[ (id: 2 rack: null)], epoch=81}}.
2024-01-19 16:57:01.916  INFO 22380 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-episteme-s3-downloader-v4-1, groupId=episteme-s3-downloader-v4] Resetting offset for partition pp.qmct.s3.file.tracker-4 to position FetchPosition{offset=682934, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[ (id: 2 rack: null)], epoch=80}}.
2024-01-19 16:57:01.916  INFO 22380 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-episteme-s3-downloader-v4-1, groupId=episteme-s3-downloader-v4] Resetting offset for partition pp.qmct.s3.file.tracker-7 to position FetchPosition{offset=674101, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional

2024-01-19 16:36:51.852  INFO 15796 --- [ntainer#0-0-C-1] c.q.episteme.consumer.KafkaConsumers     : **Current offset: 674101**
2024-01-19 16:36:51.852  INFO 15796 --- [ntainer#0-0-C-1] c.q.episteme.consumer.KafkaConsumers     : Device Mode: EUT
2024-01-19 16:36:51.852  INFO 15796 --- [ntainer#0-0-C-1] c.q.episteme.consumer.KafkaConsumers     : Received S3 key: 20240118/73dbf7e6-873a-41c7-b1ed-63f37b8f7fda/68f18ff4-cd96-43de-8041-561e17f8d2bb/646631341.1705570404281
2024-01-19 16:36:51.853  INFO 15796 --- [ntainer#0-0-C-1] c.q.episteme.consumer.KafkaConsumers     : DeviceId: 73dbf7e6-873a-41c7-b1ed-63f37b8f7fda, FileUID: 68f18ff4-cd96-43de-8041-561e17f8d2bb, FileName: 646631341.1705570404281.
2024-01-19 16:36:51.853  INFO 15796 --- [ntainer#0-0-C-1] c.q.episteme.consumer.KafkaConsumers     : Downloading an object with key= 20240118/73dbf7e6-873a-41c7-b1ed-63f37b8f7fda/68f18ff4-cd96-43de-8041-561e17f8d2bb/646631341.1705570404281
2024-01-19 16:36:52.304 ERROR 15796 --- [ntainer#0-0-C-1] c.q.episteme.consumer.KafkaConsumers     : Exception while downloading key from s3 buckets: 
2024-01-19 16:36:52.305 ERROR 15796 --- [ntainer#0-0-C-1] c.q.episteme.consumer.KafkaConsumers     : Exception Trace: {}[Ljava.lang.StackTraceElement;@14afe3b8
2024-01-19 16:36:52.305  INFO 15796 --- [ntainer#0-0-C-1] c.q.episteme.consumer.KafkaConsumers     : Total time taken to process file 68f18ff4-cd96-43de-8041-561e17f8d2bb in ms: 453
2024-01-19 16:36:52.305  INFO 15796 --- [ntainer#0-0-C-1] c.q.episteme.consumer.KafkaConsumers     : **Current offset: 674102**
2024-01-19 16:36:52.305  INFO 15796 --- [ntainer#0-0-C-1] c.q.episteme.consumer.KafkaConsumers     : Device Mode: EUT
2024-01-19 16:36:52.305  INFO 15796 --- [ntainer#0-0-C-1] c.q.episteme.consumer.KafkaConsumers     : Received S3 key: 20240118/9e93b360-f8e4-44e8-988c-64a26538b017/f93f19f1-004c-45fe-8791-81fe282e7d95/646631341.1705570405539
2024-01-19 16:36:52.305  INFO 15796 --- [ntainer#0-0-C-1] c.q.episteme.consumer.KafkaConsumers     : DeviceId: 9e93b360-f8e4-44e8-988c-64a26538b017, FileUID: f93f19f1-004c-45fe-8791-81fe282e7d95, FileName: 646631341.1705570405539.
2024-01-19 16:36:52.305  INFO 15796 --- [ntainer#0-0-C-1] c.q.episteme.consumer.KafkaConsumers     : Downloading an object with key= 20240118/9e93b360-f8e4-44e8-988c-64a26538b017/f93f19f1-004c-45fe-8791-81fe282e7d95/646631341.1705570405539
2024-01-19 16:36:52.367 ERROR 15796 --- [ntainer#0-0-C-1] c.q.episteme.consumer.KafkaConsumers     : Exception while downloading key from s3 buckets: Access Denied 
2024-01-19 16:36:52.367 ERROR 15796 --- [ntainer#0-0-C-1] c.q.episteme.consumer.KafkaConsumers     : Exception Trace: {}[Ljava.lang.StackTraceElement;@74072ede
2024-01-19 16:36:52.367  INFO 15796 --- [ntainer#0-0-C-1] c.q.episteme.consumer.KafkaConsumers     : Total time taken to process file f93f19f1-004c-45fe-8791-81fe282e7d95 in ms: 62
2024-01-19 16:36:52.367  INFO 15796 --- [ntainer#0-0-C-1] c.q.episteme.consumer.KafkaConsumers     : **Current offset: 674103**

异常情况下的消费者代码

catch (Exception ex){
            log.error("Exception while downloading key from s3 buckets: {}", ex.getMessage());
            log.error("Exception Trace: {}" +ex.getStackTrace());
            if(ex instanceof AmazonS3Exception && ((AmazonS3Exception) ex).getStatusCode()== 404){
                log.info("The S3 key: {} doesn't exists in S3 bucket", s3Key);
                log.info("Committing the offset: {}", offset);
                acknowledgment.acknowledge();
            }

        }
        log.info("Total time taken to process file {} in ms: {}",fileUid, (System.currentTimeMillis()- startTimeMsMessage));
    }
'''
apache-kafka spring-kafka
1个回答
0
投票

最喜欢的是消费者组正在重新平衡,并且消费者正在从其他分区读取消息。检查是否超过最大轮询时间值或尝试设置客户端配置的保持活动设置

© www.soinside.com 2019 - 2024. All rights reserved.