我有一个spring cloud kafka绑定消费者服务。主要消费者主题和 dlq 消费者位于不同的经纪人中。因此我需要创建多个绑定。
我面临的问题是,当我只有一个绑定和相应的属性时,
ListenerContainerCustomizer
自定义DefaultErrorHandler
在应用程序属性中定义的无状态重试之后被调用。
代码:
@Configuration
@Slf4j
public class KafkaListenerConfig {
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(
KafkaTemplate<String, byte[]> secondaryKafkaTemplate) {
log.info("inside ListenerContainerCustomizer ");
return (container, destinationName, group) -> {
log.info("inside destination {}", destinationName);
log.info("inside destination name {}", destinationName);
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(
secondaryKafkaTemplate,
(cr, e) -> {
log.error("Error processing message in topic {} at partition {} with offset {}.",
cr.topic(), cr.partition(), cr.offset());
return new TopicPartition("myConsumer-dlq", cr.partition());
}
);
DefaultErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 0L)); // No retries
container.setCommonErrorHandler(errorHandler);
};
}
}
我的application.yml
spring:
cloud:
stream:
bindings:
EventConsumer-in-0:
binder: mainKafka
content-type: application/json
consumer:
max-attempts: 2
back-off-initial-interval: 100 #10sec
back-off-max-interval: 900 #15min
back-off-multiplier: 1.5
EventConsumerDlq-in-0:
binder: dlqKafka
content-type: application/json
consumer:
max-attempts: 0
ack-off-initial-interval: 100
back-off-max-interval: 900
back-off-multiplier: 1.5
现在,下面我首先将 application-local.yml 与单个绑定共享,该绑定工作正常并且默认错误处理程序被调用,意味着日志“在目标内部”被打印。但在多重绑定配置情况下不会调用它。
此外,当日志
log.info("inside ListenerContainerCustomizer ");
在这两种情况下执行时,bean 都会被正确注入。
其次,我共享具有多重绑定的 application-local.yml,其中日志不会被打印,容器中的逻辑不会被调用(产生事件)。
另外,请注意,在多重绑定的情况下,我注释掉了第二个绑定(dlq 消费者),但在 defaulterrorhandler 容器中也不会调用生产者逻辑。
工作application-local.yml
spring:
cloud:
stream:
bindings:
EventConsumer-in-0:
destination: my-change-record.v2
group: my-change-record.v2-local
consumer:
multiplex: true
autoStartup: true
EventConsumerDlq-in-0:
destination: EventConsumer-dlq
group: dlq-group-local
consumer:
autoStartup: true
kafka:
binder:
brokers: localhost:9093
configuration:
max.poll.interval.ms: 1080000
bindings:
EventConsumer-in-0:
consumer:
ack-mode: MANUAL
不工作 application-local.yml
spring:
cloud:
stream:
default-binder: mainKafka
binders:
mainKafka:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9093
configuration:
max.poll.interval.ms: 1080000
dlqKafka:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: kafka-1.zone.org:9092,kafka-2.zone.org:9092,kafka-3.zone.org:9092
bindings:
EventConsumer-in-0:
binder: mainKafka
destination: change-record.v2
group: change-record.v2-local
EventConsumerDlq-in-0:
binder: dlqKafka
destination: EventConsumer-dlq
group: dlq-group-local
kafka:
bindings:
EventConsumer-in-0:
binder: mainKafka
consumer:
multiplex: true
autoStartup: true
ack-mode: MANUAL
EventConsumerDlq-in-0:
binder: dlqKafka
consumer:
autoStartup: true
为什么不能将
EventonsumerDlq-in-0
与原始绑定一起提供,而不是创建第二个绑定 (dlqProducerProperties
)?您可以在那里提供第二个 Kafka 集群的引导配置。例如,
...
kafka:
bindings:
EventConsumer-in-0:
binder: mainKafka
consumer:
multiplex: true
autoStartup: true
ack-mode: MANUAL
dlqProducerProperties:
configuration:
bootstrap-server: <DLQ cluster server info>