春云流卡夫卡 - 取消散装并错过心跳

问题描述 投票:0回答:1

我正在寻找一个从apache kafka读取消息的spring boot服务,通过http请求来自其他服务的消息所指示的记录,处理它们,将一些数据保存到数据库中并将结果发布到另一个主题。

这是通过

@StreamListener(Some.INPUT)
@SendTo(Some.OUTPUT)

这是在几个服务中完成的,通常工作得很好。唯一的属性集是

spring.cloud.stream.binder.consumer.concurrency=20

主题本身有20个分区,应该适合。

在监控来自kafka的读取时,我们发现了非常低的吞吐量和奇怪的行为:

该应用程序一次最多可读取500条消息,然后只需1-2分钟。在此期间,消费者反复记录它是“丢失心跳,因为分区被重新平衡”,“重新分配分区”,有时甚至抛出一个异常,说它“未能提交,因为轮询间隔已经过去”

我们得出结论,这意味着,消费者获取500条消息,需要很长时间来处理所有消息,错过了它的时间窗口,因此无法将500条消息中的任何一条提交给代理 - 这会重新分配分区并重新发送相同的消息再次。

浏览线程和文档后,我找到了“max.poll.records”属性,但是在设置此属性的位置有冲突的建议。

有人说把它设置下来

spring.cloud.stream.bindings.consumer.<input>.configuration

有人说

spring.cloud.stream.kafka.binders.consumer-properties

我尝试将两者都设置为1,但服务行为没有改变。

我如何正确处理这种情况,消费者无法跟上默认设置所需的轮询间隔?

共YAML:

spring.cloud.stream.default.group=${spring.application.name}

服务YAML

spring:
  clould:
    stream:
      default:
        consumer.headerMode: embeddedHeaders
        producer.headerMode: embeddedHeaders
      bindings:
       someOutput:
         destination: outTopic
       someInput:
         destination: inTopic
           consumer:
             concurrency: 30
      kafka:
        bindings:
          consumer:
            someInput:
              configuarion:
                max.poll.records: 20 # ConsumerConfig ignores this
              consumer:
                enableDlq: true
                configuarion:
                  max.poll.records: 30 # ConsumerConfig ignores this
          someInput:
            configuarion:
              max.poll.records: 20 # ConsumerConfig ignores this
            consumer:
              enableDlq: true
              configuarion:
                max.poll.records: 30 # ConsumerConfig ignores this
        binder:
          consumer-properties:
            max.poll.records: 10 # this gets used first
          configuration:
            max.poll.records: 40 # this get used when the first one is not present

“忽略这个”总是意味着,如果没有设置其他属性,则ConsumerConfiguration会将最大轮询记录的默认值保持为500

编辑:我们已经接近:

问题与设置exponentialBackoffStrategy的弹簧重试有关 - 以及一系列有效停止应用程序的错误。

我没有得到的是,我们通过向有问题的主题发布格式错误的消息来强制200个错误,导致应用程序读取200,花费时间(使用旧的重试配置),然后一次提交所有200个错误。

如果有的话,这有什么意义呢?

max.poll.records: 1
concurrency: 1
ackEachRecod = true
enableDlq: true # (which implicitly makes autoCommitOffsets = true according to the docs)
apache-kafka spring-cloud spring-cloud-stream
1个回答
0
投票

它的

spring.cloud.stream.kafka.bindings.consumer.<input>.consumer.configuration.max.poll.records
.

the documentation ......

卡夫卡消费者物业

以下属性仅适用于Kafka使用者,必须以spring.cloud.stream.kafka.bindings.<channelName>.consumer.为前缀

...

组态

使用包含通用Kafka使用者属性的键/值对映射。

默认值:空地图。

...

你也可以增加max.poll.interval.ms

编辑

我刚刚用2.1.0.RELEASE进行了测试 - 它的工作方式如我所述:

没有设置

2019-03-01 08:47:59.560  INFO 44698 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    ...
    max.poll.records = 500
    ...

启动默认

spring.kafka.consumer.properties.max.poll.records=42

2019-03-01 08:49:49.197  INFO 45044 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    ...
    max.poll.records = 42
    ...

Binder默认#1

spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.consumer-properties.max.poll.records=43

2019-03-01 08:52:11.469  INFO 45842 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    ...
    max.poll.records = 43
    ...

Binder默认#2

spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.configuration.max.poll.records=43

2019-03-01 08:54:06.211  INFO 46252 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    ...
    max.poll.records = 43
    ...

绑定默认值

spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.configuration.max.poll.records=43
spring.cloud.stream.kafka.default.consumer.configuration.max.poll.records=44

2019-03-01 09:02:26.004  INFO 47833 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    ...
    max.poll.records = 44
    ...

具体绑定

spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.configuration.max.poll.records=43
spring.cloud.stream.kafka.default.consumer.configuration.max.poll.records=44
spring.cloud.stream.kafka.bindings.input.consumer.configuration.max.poll.records=45

2019-03-01 09:05:01.452  INFO 48330 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    ...
    max.poll.records = 45
    ...

我懂了

这是完整的测试应用程序。我只是在http://start.spring.io创建了一个新的应用程序并选择了“Kafka”和“Cloud Stream”。

@SpringBootApplication
@EnableBinding(Sink.class)
public class So54932453Application {

    public static void main(String[] args) {
        SpringApplication.run(So54932453Application.class, args).close();
    }

    @StreamListener(Sink.INPUT)
    public void listen(String in) {

    }

}

spring.cloud.stream.bindings.input.group=so54932453

spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.configuration.max.poll.records=43
spring.cloud.stream.kafka.default.consumer.configuration.max.poll.records=44
spring.cloud.stream.kafka.bindings.input.consumer.configuration.max.poll.records=45

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.3.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>net.gprussell</groupId>
    <artifactId>so54932453</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>so54932453</name>
    <description>Demo</description>

    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Greenwich.RELEASE</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

    <repositories>
        <repository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/milestone</url>
        </repository>
    </repositories>

</project>
© www.soinside.com 2019 - 2024. All rights reserved.