我有一个 spring kafka 消费者,它从具有 10 个分区的 kafka 主题读取消息,我只有 1 个正在运行的 kafka 消费者实例。我正在手动提交偏移量并禁用自动提交,我正在记录哪个偏移量消费者正在处理,即使在处理消息时出现异常并且未提交偏移量,消费者也会移动到下一个偏移量。这怎么可能。
Spring Kafka 配置:
@Bean
public ConsumerFactory<String, String> uploadFileConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000);
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule " +
"required username='"+kafkaUsername+"' password='"+kafkaPassword+"';");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put("ssl.truststore.location", trustStoreLoc);
props.put("ssl.truststore.password", trustStorePassword);
//retry backoff ms
//Error handler retries
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new StringDeserializer());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> uploadFileConsumerKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(uploadFileConsumerFactory());
factory.setMissingTopicsFatal(false);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setCommonErrorHandler(new KafkaErrorHandler());
return factory;
}
应用程序日志
s3-downloader-v4-1, groupId=episteme-s3-downloader-v4] **Found no committed offset for partition** pp.qmct.s3.file.tracker-0
2024-01-19 16:57:01.717 INFO 22380 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-episteme-s3-downloader-v4-1, groupId=episteme-s3-downloader-v4] Found no committed offset for partition pp.qmct.s3.file.tracker-1
2024-01-19 16:57:01.717 INFO 22380 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-episteme-s3-downloader-v4-1, groupId=episteme-s3-downloader-v4] Found no committed offset for partition pp.qmct.s3.file.tracker-2
2024-01-19 16:57:01.717 INFO 22380 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-episteme-s3-downloader-v4-1, groupId=episteme-s3-downloader-v4] Found no committed offset for partition pp.qmct.s3.file.tracker-3
2024-01-19 16:57:01.717 INFO 22380 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-episteme-s3-downloader-v4-1, groupId=episteme-s3-downloader-v4] Found no committed offset for partition pp.qmct.s3.file.tracker-4
2024-01-19 16:57:01.719 INFO 22380 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-episteme-s3-downloader-v4-1, groupId=episteme-s3-downloader-v4] Found no committed offset for partition pp.qmct.s3.file.tracker-5
2024-01-19 16:57:01.719 INFO 22380 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-episteme-s3-downloader-v4-1, groupId=episteme-s3-downloader-v4] Found no committed offset for partition pp.qmct.s3.file.tracker-6
2024-01-19 16:57:01.719 INFO 22380 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-episteme-s3-downloader-v4-1, groupId=episteme-s3-downloader-v4] Found no committed offset for partition pp.qmct.s3.file.tracker-7
2024-01-19 16:57:01.719 INFO 22380 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-episteme-s3-downloader-v4-1, groupId=episteme-s3-downloader-v4] Found no committed offset for partition pp.qmct.s3.file.tracker-8
2024-01-19 16:57:01.719 INFO 22380 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-episteme-s3-downloader-v4-1, groupId=episteme-s3-downloader-v4] Found no committed offset for partition pp.qmct.s3.file.tracker-9
2024-01-19 16:57:01.896 INFO 22380 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-episteme-s3-downloader-v4-1, groupId=episteme-s3-downloader-v4] Resetting offset for partition pp.qmct.s3.file.tracker-2 to position FetchPosition{offset=684457, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[-11:32401 (id: 1 rack: null)], epoch=74}}.
2024-01-19 16:57:01.896 INFO 22380 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-episteme-s3-downloader-v4-1, groupId=episteme-s3-downloader-v4] Resetting offset for partition pp.qmct.s3.file.tracker-5 to position FetchPosition{offset=674758, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[-11:32401 (id: 1 rack: null)], epoch=74}}.
2024-01-19 16:57:01.898 INFO 22380 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-episteme-s3-downloader-v4-1, groupId=episteme-s3-downloader-v4] Resetting offset for partition pp.qmct.s3.file.tracker-8 to position FetchPosition{offset=687910, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[-11:32401 (id: 1 rack: null)], epoch=74}}.
2024-01-19 16:57:01.909 INFO 22380 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-episteme-s3-downloader-v4-1, groupId=episteme-s3-downloader-v4] Resetting offset for partition pp.qmct.s3.file.tracker-0 to position FetchPosition{offset=684558, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[-10:32400 (id: 0 rack: null)], epoch=70}}.
2024-01-19 16:57:01.909 INFO 22380 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-episteme-s3-downloader-v4-1, groupId=episteme-s3-downloader-v4] Resetting offset for partition pp.qmct.s3.file.tracker-3 to position FetchPosition{offset=680112, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[-10:32400 (id: 0 rack: null)], epoch=69}}.
2024-01-19 16:57:01.909 INFO 22380 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-episteme-s3-downloader-v4-1, groupId=episteme-s3-downloader-v4] Resetting offset for partition pp.qmct.s3.file.tracker-6 to position FetchPosition{offset=691661, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[-10:32400 (id: 0 rack: null)], epoch=70}}.
2024-01-19 16:57:01.909 INFO 22380 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-episteme-s3-downloader-v4-1, groupId=episteme-s3-downloader-v4] Resetting offset for partition pp.qmct.s3.file.tracker-9 to position FetchPosition{offset=702081, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{ (id: 0 rack: null)], epoch=68}}.
2024-01-19 16:57:01.916 INFO 22380 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-episteme-s3-downloader-v4-1, groupId=episteme-s3-downloader-v4] Resetting offset for partition pp.qmct.s3.file.tracker-1 to position FetchPosition{offset=678786, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[ (id: 2 rack: null)], epoch=81}}.
2024-01-19 16:57:01.916 INFO 22380 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-episteme-s3-downloader-v4-1, groupId=episteme-s3-downloader-v4] Resetting offset for partition pp.qmct.s3.file.tracker-4 to position FetchPosition{offset=682934, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[ (id: 2 rack: null)], epoch=80}}.
2024-01-19 16:57:01.916 INFO 22380 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-episteme-s3-downloader-v4-1, groupId=episteme-s3-downloader-v4] Resetting offset for partition pp.qmct.s3.file.tracker-7 to position FetchPosition{offset=674101, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional
2024-01-19 16:36:51.852 INFO 15796 --- [ntainer#0-0-C-1] c.q.episteme.consumer.KafkaConsumers : **Current offset: 674101**
2024-01-19 16:36:51.852 INFO 15796 --- [ntainer#0-0-C-1] c.q.episteme.consumer.KafkaConsumers : Device Mode: EUT
2024-01-19 16:36:51.852 INFO 15796 --- [ntainer#0-0-C-1] c.q.episteme.consumer.KafkaConsumers : Received S3 key: 20240118/73dbf7e6-873a-41c7-b1ed-63f37b8f7fda/68f18ff4-cd96-43de-8041-561e17f8d2bb/646631341.1705570404281
2024-01-19 16:36:51.853 INFO 15796 --- [ntainer#0-0-C-1] c.q.episteme.consumer.KafkaConsumers : DeviceId: 73dbf7e6-873a-41c7-b1ed-63f37b8f7fda, FileUID: 68f18ff4-cd96-43de-8041-561e17f8d2bb, FileName: 646631341.1705570404281.
2024-01-19 16:36:51.853 INFO 15796 --- [ntainer#0-0-C-1] c.q.episteme.consumer.KafkaConsumers : Downloading an object with key= 20240118/73dbf7e6-873a-41c7-b1ed-63f37b8f7fda/68f18ff4-cd96-43de-8041-561e17f8d2bb/646631341.1705570404281
2024-01-19 16:36:52.304 ERROR 15796 --- [ntainer#0-0-C-1] c.q.episteme.consumer.KafkaConsumers : Exception while downloading key from s3 buckets:
2024-01-19 16:36:52.305 ERROR 15796 --- [ntainer#0-0-C-1] c.q.episteme.consumer.KafkaConsumers : Exception Trace: {}[Ljava.lang.StackTraceElement;@14afe3b8
2024-01-19 16:36:52.305 INFO 15796 --- [ntainer#0-0-C-1] c.q.episteme.consumer.KafkaConsumers : Total time taken to process file 68f18ff4-cd96-43de-8041-561e17f8d2bb in ms: 453
2024-01-19 16:36:52.305 INFO 15796 --- [ntainer#0-0-C-1] c.q.episteme.consumer.KafkaConsumers : **Current offset: 674102**
2024-01-19 16:36:52.305 INFO 15796 --- [ntainer#0-0-C-1] c.q.episteme.consumer.KafkaConsumers : Device Mode: EUT
2024-01-19 16:36:52.305 INFO 15796 --- [ntainer#0-0-C-1] c.q.episteme.consumer.KafkaConsumers : Received S3 key: 20240118/9e93b360-f8e4-44e8-988c-64a26538b017/f93f19f1-004c-45fe-8791-81fe282e7d95/646631341.1705570405539
2024-01-19 16:36:52.305 INFO 15796 --- [ntainer#0-0-C-1] c.q.episteme.consumer.KafkaConsumers : DeviceId: 9e93b360-f8e4-44e8-988c-64a26538b017, FileUID: f93f19f1-004c-45fe-8791-81fe282e7d95, FileName: 646631341.1705570405539.
2024-01-19 16:36:52.305 INFO 15796 --- [ntainer#0-0-C-1] c.q.episteme.consumer.KafkaConsumers : Downloading an object with key= 20240118/9e93b360-f8e4-44e8-988c-64a26538b017/f93f19f1-004c-45fe-8791-81fe282e7d95/646631341.1705570405539
2024-01-19 16:36:52.367 ERROR 15796 --- [ntainer#0-0-C-1] c.q.episteme.consumer.KafkaConsumers : Exception while downloading key from s3 buckets: Access Denied
2024-01-19 16:36:52.367 ERROR 15796 --- [ntainer#0-0-C-1] c.q.episteme.consumer.KafkaConsumers : Exception Trace: {}[Ljava.lang.StackTraceElement;@74072ede
2024-01-19 16:36:52.367 INFO 15796 --- [ntainer#0-0-C-1] c.q.episteme.consumer.KafkaConsumers : Total time taken to process file f93f19f1-004c-45fe-8791-81fe282e7d95 in ms: 62
2024-01-19 16:36:52.367 INFO 15796 --- [ntainer#0-0-C-1] c.q.episteme.consumer.KafkaConsumers : **Current offset: 674103**
异常情况下的消费者代码
catch (Exception ex){
log.error("Exception while downloading key from s3 buckets: {}", ex.getMessage());
log.error("Exception Trace: {}" +ex.getStackTrace());
if(ex instanceof AmazonS3Exception && ((AmazonS3Exception) ex).getStatusCode()== 404){
log.info("The S3 key: {} doesn't exists in S3 bucket", s3Key);
log.info("Committing the offset: {}", offset);
acknowledgment.acknowledge();
}
}
log.info("Total time taken to process file {} in ms: {}",fileUid, (System.currentTimeMillis()- startTimeMsMessage));
}
'''
最喜欢的是消费者组正在重新平衡,并且消费者正在从其他分区读取消息。检查是否超过最大轮询时间值或尝试设置客户端配置的保持活动设置