自定义 DefaultErrorHandler 未在多个绑定 kafka 中被调用

问题描述 投票:0回答:1

我有一个spring cloud kafka绑定消费者服务。主要消费者主题和 dlq 消费者位于不同的经纪人中。因此我需要创建多个绑定。

我面临的问题是,当我只有一个绑定和相应的属性时,

ListenerContainerCustomizer
自定义
DefaultErrorHandler
在应用程序属性中定义的无状态重试之后被调用。

代码:

@Configuration
@Slf4j
public class KafkaListenerConfig {

@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(
    KafkaTemplate<String, byte[]> secondaryKafkaTemplate) {
    log.info("inside ListenerContainerCustomizer ");
    return (container, destinationName, group) -> {
        log.info("inside destination {}", destinationName);
            log.info("inside destination name {}", destinationName);
            DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(
                secondaryKafkaTemplate,
                (cr, e) -> {
                    log.error("Error processing message in topic {} at partition {} with offset {}.",
                        cr.topic(), cr.partition(), cr.offset());
                    return new TopicPartition("myConsumer-dlq", cr.partition());
                }
            );
            DefaultErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 0L)); // No retries
            container.setCommonErrorHandler(errorHandler);
    };
  }
}

我的application.yml

spring:
  cloud:
    stream:
      bindings:
        EventConsumer-in-0:
          binder: mainKafka
          content-type: application/json
          consumer:
              max-attempts: 2
              back-off-initial-interval: 100 #10sec
              back-off-max-interval: 900 #15min
              back-off-multiplier: 1.5
        EventConsumerDlq-in-0:
          binder: dlqKafka
          content-type: application/json
          consumer:
            max-attempts: 0
            ack-off-initial-interval: 100
            back-off-max-interval: 900
            back-off-multiplier: 1.5

现在,下面我首先将 application-local.yml 与单个绑定共享,该绑定工作正常并且默认错误处理程序被调用,意味着日志“在目标内部”被打印。但在多重绑定配置情况下不会调用它。

此外,当日志

log.info("inside ListenerContainerCustomizer ");
在这两种情况下执行时,bean 都会被正确注入。

其次,我共享具有多重绑定的 application-local.yml,其中日志不会被打印,容器中的逻辑不会被调用(产生事件)。

另外,请注意,在多重绑定的情况下,我注释掉了第二个绑定(dlq 消费者),但在 defaulterrorhandler 容器中也不会调用生产者逻辑。

工作application-local.yml

spring:
    cloud:
        stream:
            bindings:
                EventConsumer-in-0:
                    destination: my-change-record.v2
                    group: my-change-record.v2-local
                    consumer:
                        multiplex: true
                        autoStartup: true
                EventConsumerDlq-in-0:
                    destination: EventConsumer-dlq
                    group: dlq-group-local
                    consumer:
                        autoStartup: true
            kafka:
                binder:
                    brokers: localhost:9093
                    configuration:
                        max.poll.interval.ms: 1080000
                bindings:
                    EventConsumer-in-0:
                        consumer:
                            ack-mode: MANUAL

不工作 application-local.yml

spring:
  cloud:
    stream:
      default-binder: mainKafka
      binders:
        mainKafka:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: localhost:9093
                      configuration:
                        max.poll.interval.ms: 1080000
        dlqKafka:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: kafka-1.zone.org:9092,kafka-2.zone.org:9092,kafka-3.zone.org:9092
      bindings:
        EventConsumer-in-0:
          binder: mainKafka
          destination: change-record.v2
          group: change-record.v2-local
        EventConsumerDlq-in-0:
          binder: dlqKafka
          destination: EventConsumer-dlq
          group: dlq-group-local
      kafka:
        bindings:
          EventConsumer-in-0:
            binder: mainKafka
            consumer:
              multiplex: true
              autoStartup: true
              ack-mode: MANUAL
          EventConsumerDlq-in-0:
            binder: dlqKafka
            consumer:
              autoStartup: true
apache-kafka spring-cloud spring-cloud-stream
1个回答
0
投票

为什么不能将

EventonsumerDlq-in-0
与原始绑定一起提供,而不是创建第二个绑定 (
dlqProducerProperties
)?您可以在那里提供第二个 Kafka 集群的引导配置。例如,

...
kafka:
  bindings:
    EventConsumer-in-0:
      binder: mainKafka
        consumer:
          multiplex: true
          autoStartup: true
          ack-mode: MANUAL
          dlqProducerProperties:
            configuration:
              bootstrap-server: <DLQ cluster server info>
© www.soinside.com 2019 - 2024. All rights reserved.