具有两个输入主题的函数路由似乎破坏了 KafkaBinderMetrics

问题描述 投票:0回答:1

根据https://stackoverflow.com/a/66871632/22992363,我正在配置一个功能路由器,它接受来自两个 Kafka 主题的输入。这是我的配置的精简副本:

spring:
  cloud:
    function:
      routing-expression: "headers['eventType']"
    stream:
      bindings:
        functionRouter-in-0:
          destination: order.requests,order.events
          binder: kafka
          content-type: application/json
          group: ${spring.application.name}-app
          consumer:
            concurrency: 12
            autoStartup: true

应用程序启动时,出现异常:

2023-12-19 11:45:25.505 DEBUG [order-fulfilment-svc,,,] 21186 --- [pool-3-thread-1] o.s.c.s.binder.kafka.KafkaBinderMetrics  : Cannot generate metric for topic: order.requests

java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. currentThread(name: pool-3-thread-1, id: 65) otherThread(id: 66)
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2551)
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2532)
    at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1993)
    at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1969)
    at org.springframework.cloud.stream.binder.kafka.KafkaBinderMetrics.findTotalTopicGroupLag(KafkaBinderMetrics.java:204)
    at org.springframework.cloud.stream.binder.kafka.KafkaBinderMetrics.computeUnconsumedMessages(KafkaBinderMetrics.java:189)
    at org.springframework.cloud.stream.binder.kafka.KafkaBinderMetrics.lambda$bindTo$0(KafkaBinderMetrics.java:154)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.runAndReset$$$capture(FutureTask.java:305)
    at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

我尝试将并发重置为 1,但这并不能消除问题。

删除第二个主题确实会使异常消失。

我正在使用 SCS 3.2.9 和 spring-kafka 2.9.13。

这是一个错误吗?

spring-kafka spring-cloud-stream
1个回答
0
投票
每次收到

KafkaBinderMetrics

 事件时,
KafkaMessageChannelBinder
调用
BindingCreatedEvent
。此活动是针对 functionRouter-in-0 的每个主题
创建
。换句话说,在处理
order.requests
时,我们针对 this.scheduler.scheduleWithFixedDelay
 主题运行 
order.requests
。处理 
order.events
时,我们为
this.scheduler.scheduleWithFixedDelay
主题运行
order.requests
(此处创建了一个新线程)和
order.events

总而言之,问题在于每次初始化新主题时,

KafkaBinderMetrics
都会从
KafkaMessageChannelBinder
获取主题列表。初始化的主题越多,
KafkaMessageChannelBinder
中的主题就越多。

解决问题有3种变体:

  • 保持一切不变 - 缺点:可以禁用的日志条目
  • 禁用指标计数 - 缺点:没有指标
  • 覆盖 KafkaBinderMetrics

github

上发布

禁用指标:

@Bean
public MeterFilter disableKafkaOffsetMetrics() {
    return MeterFilter.denyNameStartsWith(KafkaBinderMetrics.OFFSET_LAG_METRIC_NAME);
}
© www.soinside.com 2019 - 2024. All rights reserved.