由于背压,下游消费者的 Kafka 主题分区暂停。最终只有一个分区被排空

问题描述 投票:0回答:1

在我的项目中,我使用 spring-kafka (v3.1.1) 和reactor-kafka (v1.3.22) 来消费来自特定主题的事件。我们称之为“主题a”。下游消费者处理每条消息大约需要50ms。当主题中产生连续的 Kafka 事件流(例如连续 110 万个事件)时,这些事件会均匀分布到 5 个分区(“topic-a”中有 5 个分区),每个分区大约有 225k 条记录。最大轮询记录设置为 40(我之前设置为 200,但仍然遇到相同的问题)。当消费者消费时,我看到反应器kafka代码(在ConsumerEventLoop中)由于背压而暂停了所有5个分区。背压解除后,恢复所有 5 个分区。当分区暂停时,Apache Kafka 代码会跳过获取记录,以便 Kafka 分区中未使用的记录不会被耗尽。然而,当 Kafka 消费者被 Reactor Kafka 代码暂停太多次时,最终似乎并不是所有分区的记录都被跳过获取(根据日志)。最终,5 个分区的事件中的 1 个被耗尽并处理,但其他 4 个分区的偏移量只是成为最新的,而它们的记录实际上没有被 Reactor Kafka 代码发出。即使我重新启动应用程序后,在分区分配时,其他 4 个分区的位置首先位于结束偏移之前的位置。然后当Kafka开始获取时,4个分区中的所有位置都被指示从结束偏移量开始。这会导致这 4 个分区中未处理的消息不会被耗尽。我想知道出了什么问题以及如何在不增加偏移量的情况下清空其他 4 个分区的记录。

下面是我的 KafkaReceiver.receive 代码的简化版本:

reactiveKafkaConsumerTemplate
    .receive()
    .map(receiverRecord -> convertToCustomObject(receiverRecord))
    .flatMap(customObjectReceiverRecordTuple -> getDetails(customObjectReceiverRecordTuple._1), 3) // Network call so can be slow
    .flatMap(detailedCustomObjectReceiverRecordTuple -> save(detailedCustomObjectReceiverRecordTuple._1), 3) // Network call so can be slow
    .doOnNext(receiverRecord -> receiverRecord.receiverOffset().acknowledge())
    .map(receiverRecord -> true)
    .bufferTimeout(110, Duration.ofSeconds(5))
    .map(List::size)
    .doOnNext(integer -> log.info("Saved {} detailedCustomObjects", integer)
    .onErrorResume(throwable -> {
        log.error("Error encountered, resuming...", throwable);
        return Mono.empty();
    })
    .subscribe();

这是消费者停止发出/处理更多事件之前的日志示例:

r.k.r.internals.ConsumerEventLoop        : onRequest.toAdd 1, paused true
r.k.r.internals.ConsumerEventLoop        : Consumer woken
r.k.r.internals.ConsumerEventLoop        : Resumed partitions: [topic-a-0, topic-a-1, topic-a-2, topic-a-3, topic-a-4]
r.k.r.internals.ConsumerEventLoop        : Emitting 40 records, requested now 0
r.k.r.internals.ConsumerEventLoop        : Paused - back pressure
MyReactiveConsumerClass                  : Saved 61 detailedCustomObjects
r.k.r.internals.ConsumerEventLoop        : Async committing: {topic-a-1=OffsetAndMetadata{offset=913053, leaderEpoch=null, metadata=''}}
r.k.r.internals.ConsumerEventLoop        : onRequest.toAdd 1, paused true
r.k.r.internals.ConsumerEventLoop        : Consumer woken
r.k.r.internals.ConsumerEventLoop        : Resumed partitions: [topic-a-0, topic-a-1, topic-a-2, topic-a-3, topic-a-4]
r.k.r.internals.ConsumerEventLoop        : Emitting 40 records, requested now 0
r.k.r.internals.ConsumerEventLoop        : Paused - back pressure
MyReactiveConsumerClass                  : Saved 55 detailedCustomObjects
r.k.r.internals.ConsumerEventLoop        : onRequest.toAdd 1, paused true
r.k.r.internals.ConsumerEventLoop        : Consumer woken
r.k.r.internals.ConsumerEventLoop        : Async committing: {topic-a-1=OffsetAndMetadata{offset=913111, leaderEpoch=null, metadata=''}}
r.k.r.internals.ConsumerEventLoop        : Resumed partitions: [topic-a-0, topic-a-1, topic-a-2, topic-a-3, topic-a-4]
r.k.r.internals.ConsumerEventLoop        : Emitting 40 records, requested now 0
r.k.r.internals.ConsumerEventLoop        : Paused - back pressure
r.k.r.internals.ConsumerEventLoop        : onRequest.toAdd 1, paused true
r.k.r.internals.ConsumerEventLoop        : Consumer woken
r.k.r.internals.ConsumerEventLoop        : Resumed partitions: [topic-a-0, topic-a-1, topic-a-2, topic-a-3, topic-a-4]
r.k.r.internals.ConsumerEventLoop        : Emitting 40 records, requested now 0
r.k.r.internals.ConsumerEventLoop        : Paused - back pressure
MyReactiveConsumerClass                  : Saved 64 detailedCustomObjects
r.k.r.internals.ConsumerEventLoop        : Async committing: {topic-a-1=OffsetAndMetadata{offset=913170, leaderEpoch=null, metadata=''}}
r.k.r.internals.ConsumerEventLoop        : onRequest.toAdd 1, paused true
r.k.r.internals.ConsumerEventLoop        : Consumer woken
r.k.r.internals.ConsumerEventLoop        : Resumed partitions: [topic-a-0, topic-a-1, topic-a-2, topic-a-3, topic-a-4]
r.k.r.internals.ConsumerEventLoop        : Emitting 40 records, requested now 0
r.k.r.internals.ConsumerEventLoop        : Paused - back pressure
MyReactiveConsumerClass                  : Saved 58 detailedCustomObjects
r.k.r.internals.ConsumerEventLoop        : Async committing: {topic-a-1=OffsetAndMetadata{offset=913226, leaderEpoch=null, metadata=''}}
r.k.r.internals.ConsumerEventLoop        : onRequest.toAdd 1, paused true
r.k.r.internals.ConsumerEventLoop        : Consumer woken
r.k.r.internals.ConsumerEventLoop        : Resumed partitions: [topic-a-0, topic-a-1, topic-a-2, topic-a-3, topic-a-4]
r.k.r.internals.ConsumerEventLoop        : Emitting 19 records, requested now 0
r.k.r.internals.ConsumerEventLoop        : Paused - back pressure
MyReactiveConsumerClass                  : Saved 52 detailedCustomObjects
r.k.r.internals.ConsumerEventLoop        : onRequest.toAdd 1, paused true
r.k.r.internals.ConsumerEventLoop        : Consumer woken
r.k.r.internals.ConsumerEventLoop        : Resumed partitions: [topic-a-0, topic-a-1, topic-a-2, topic-a-3, topic-a-4]
r.k.r.internals.ConsumerEventLoop        : Async committing: {topic-a-1=OffsetAndMetadata{offset=913280, leaderEpoch=null, metadata=''}}
MyReactiveConsumerClass                  : Saved 31 detailedCustomObjects
r.k.r.internals.ConsumerEventLoop        : Async committing: {topic-a-1=OffsetAndMetadata{offset=913295, leaderEpoch=null, metadata=''}}

以下是 Apache Kafka 代码由于分区暂停而跳过获取请求的示例日志:

o.a.k.c.c.internals.AbstractFetch        : [Consumer clientId=consumer-uuid, groupId=consumerGroup] Skipping fetching records for assigned partition topic-a-3 because it is paused
o.a.k.c.c.internals.AbstractFetch        : [Consumer clientId=consumer-uuid, groupId=consumerGroup] Skipping fetching records for assigned partition topic-a-1 because it is paused
o.a.k.c.c.internals.AbstractFetch        : [Consumer clientId=consumer-uuid, groupId=consumerGroup] Skipping fetching records for assigned partition topic-a-4 because it is paused
o.a.k.c.c.internals.AbstractFetch        : [Consumer clientId=consumer-uuid, groupId=consumerGroup] Skipping fetching records for assigned partition topic-a-2 because it is paused
o.a.k.c.c.internals.AbstractFetch        : [Consumer clientId=consumer-uuid, groupId=consumerGroup] Skipping fetching records for assigned partition topic-a-0 because it is paused
o.a.k.c.c.internals.AbstractFetch        : [Consumer clientId=consumer-uuid, groupId=consumerGroup] Skipping fetching records for assigned partition topic-a-3 because it is paused
o.a.k.c.c.internals.AbstractFetch        : [Consumer clientId=consumer-uuid, groupId=consumerGroup] Skipping fetching records for assigned partition topic-a-1 because it is paused
o.a.k.c.c.internals.AbstractFetch        : [Consumer clientId=consumer-uuid, groupId=consumerGroup] Skipping fetching records for assigned partition topic-a-4 because it is paused
o.a.k.c.c.internals.AbstractFetch        : [Consumer clientId=consumer-uuid, groupId=consumerGroup] Skipping fetching records for assigned partition topic-a-2 because it is paused
o.a.k.c.c.internals.AbstractFetch        : [Consumer clientId=consumer-uuid, groupId=consumerGroup] Skipping fetching records for assigned partition topic-a-0 because it is paused
apache-kafka project-reactor reactor-kafka
1个回答
0
投票

我发现了问题所在。这个 GitHub 评论很好地解释了根本原因是什么:https://github.com/tulios/kafkajs/issues/1119#issuecomment-862983659。需要确保保留字节大小足够大。

© www.soinside.com 2019 - 2024. All rights reserved.