根据https://stackoverflow.com/a/66871632/22992363,我正在配置一个功能路由器,它接受来自两个 Kafka 主题的输入。这是我的配置的精简副本:
spring:
cloud:
function:
routing-expression: "headers['eventType']"
stream:
bindings:
functionRouter-in-0:
destination: order.requests,order.events
binder: kafka
content-type: application/json
group: ${spring.application.name}-app
consumer:
concurrency: 12
autoStartup: true
应用程序启动时,出现异常:
2023-12-19 11:45:25.505 DEBUG [order-fulfilment-svc,,,] 21186 --- [pool-3-thread-1] o.s.c.s.binder.kafka.KafkaBinderMetrics : Cannot generate metric for topic: order.requests
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. currentThread(name: pool-3-thread-1, id: 65) otherThread(id: 66)
at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2551)
at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2532)
at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1993)
at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1969)
at org.springframework.cloud.stream.binder.kafka.KafkaBinderMetrics.findTotalTopicGroupLag(KafkaBinderMetrics.java:204)
at org.springframework.cloud.stream.binder.kafka.KafkaBinderMetrics.computeUnconsumedMessages(KafkaBinderMetrics.java:189)
at org.springframework.cloud.stream.binder.kafka.KafkaBinderMetrics.lambda$bindTo$0(KafkaBinderMetrics.java:154)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.runAndReset$$$capture(FutureTask.java:305)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
我尝试将并发重置为 1,但这并不能消除问题。
删除第二个主题确实会使异常消失。
我正在使用 SCS 3.2.9 和 spring-kafka 2.9.13。
这是一个错误吗?
KafkaBinderMetrics
事件时,
KafkaMessageChannelBinder
调用 BindingCreatedEvent
。此活动是针对 functionRouter-in-0
的每个主题创建。换句话说,在处理
order.requests
时,我们针对 this.scheduler.scheduleWithFixedDelay
主题运行
order.requests
。处理 order.events
时,我们为 this.scheduler.scheduleWithFixedDelay
主题运行 order.requests
(此处创建了一个新线程)和 order.events
。
总而言之,问题在于每次初始化新主题时,
KafkaBinderMetrics
都会从KafkaMessageChannelBinder
获取主题列表。初始化的主题越多,KafkaMessageChannelBinder
中的主题就越多。
解决问题有3种变体:
在 github
上发布禁用指标:
@Bean
public MeterFilter disableKafkaOffsetMetrics() {
return MeterFilter.denyNameStartsWith(KafkaBinderMetrics.OFFSET_LAG_METRIC_NAME);
}