Spring Cloud Stream Rabbit Binder 并发

问题描述 投票:0回答:1

我有一个消费者微服务,用于处理来自其他生产者微服务的消息。我使用 Spring Cloud Stream 和 RabbitMq Binder 管理事件驱动结构。
我想限制消费者微服务消耗的最大并发消息数。我的意思是,如果生产者发送 7 条消息,如果我将要消费的最大消息数限制为 5 条。我预计消费者微服务将消费前 5 条消息。如果任何消息处理完成,我预计会消耗第 6 条消息。我尝试使用 RabbitMq 消费者属性“max-concurrenty:5”来管理它,但它不起作用。

如果有 2 个消费者微服务正在运行且最大并发数为 5,我预计每个消费者微服务最多应消费 5 条消息。他们总共应该消耗 10 条消息。此外,消息应该以平衡的方式在微服务之间分发。

这是我在 application.yml 中的活页夹配置


spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: admin
  autoconfigure:
    exclude: org.springframework.boot.actuate.autoconfigure.metrics.jdbc.DataSourcePoolMetricsAutoConfiguration
  application:
    name: simulationService
  cloud:
    function:
      definition: functionRouter
    stream:
      function:
        routing:
          enabled: true
      bindings:
        callBack-out-0:
          destination: simulation-workflow
          content-type: application/json
          group: workflow
        functionRouter-in-0:
          destination: simulation
          content-type: application/json
          group: simulation
      rabbit:
        bindings:
           functionRouter:
             consumer:
               max-concurrency: 5

java spring-boot rabbitmq spring-cloud-stream
1个回答
0
投票

concurrency = 5
表示每个实例将获得5个消费者;每个消费者都有一个
prefetch
属性(Spring AMQP 中默认为 250,spring-cloud-stream 中默认为 1),该属性会影响消息分发。

1 应在消费者之间提供均匀的分配,但要以性能为代价。

无法将消费者“限制”为仅 5 条消息,但

prefetch = 5
可能会满足您的需求。

https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream-binder-rabbit.html#_rabbitmq_consumer_properties

预取

预取计数。

默认:1。

© www.soinside.com 2019 - 2024. All rights reserved.