使用 Spring Cloud Stream 3.0.9.RELEASE

问题描述 投票:0回答:2

我想在 Sprint Cloud Stream 3.0.9.RELEASE 中添加了一个 bean 到自定义的 ConsumerInterceptor 中,因为 ConsumerConfigCustomizer 被添加了。但是,注入的 bean 始终为 NULL。

Foo(要注入到 MyConsumerInterceptor 的依赖项)

public class Foo {

    public void foo(String what) {
        System.out.println(what);
    }
}

MyConsumerInterceptor(自定义KafkaConsumerInterceptor)

public static class MyConsumerInterceptor implements ConsumerInterceptor<String, String> {

    private Foo foo;

    @Override
    public void configure(Map<String, ?> configs) {
        this.foo = (Foo) configs.get("fooBean");    #Comment_1
    }

    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        this.foo.foo("consumer interceptor");
        return records;
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {

    }

    @Override
    public void close() {
    }
}
@SpringBootApplication
public static class ConfigCustomizerTestConfig {


    @Bean
    public ConsumerConfigCustomizer consumerConfigCustomizer() {
        return new ConsumerConfigCustomizer() {
            @Override
            public void configure(Map<String, Object> consumerProperties) {
                consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName());
                consumerProperties.put("fooBean", foo());
            }
        };
    }

    @Bean
    public Foo foo() {
        return new Foo();
    }
}

Application.yml

spring:
  cloud:
    function:
      definition: consume;
    stream:
      function:
        bindings:
          consume-in-0: input
      bindings:
        input:
          destination: students
          group: groupA
          binder: kafkaBinder
      binders:
        kafkaBinder:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: <IPs>

依赖关系

    <dependencies>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
            <version>3.0.9.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-function-context</artifactId>
            <version>3.0.11.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
            <version>3.0.9.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
            <version>3.0.9.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-function-context</artifactId>
            <version>3.0.11.RELEASE</version>
        </dependency>
    </dependencies>

日志:

5:19:05.732 [main] INFO  DefaultConfiguringBeanFactoryPostProcessor   - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
15:19:05.735 [main] INFO  DefaultConfiguringBeanFactoryPostProcessor   - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
15:19:05.737 [main] INFO  DefaultConfiguringBeanFactoryPostProcessor   - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
15:19:05.764 [main] INFO  PostProcessorRegistrationDelegate$BeanPostProcessorChecker   - Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
15:19:05.768 [main] INFO  PostProcessorRegistrationDelegate$BeanPostProcessorChecker   - Bean 'org.springframework.boot.autoconfigure.integration.IntegrationAutoConfiguration$IntegrationJmxConfiguration' of type [org.springframework.boot.autoconfigure.integration.IntegrationAutoConfiguration$IntegrationJmxConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
15:19:05.773 [main] INFO  PostProcessorRegistrationDelegate$BeanPostProcessorChecker   - Bean 'org.springframework.boot.autoconfigure.jmx.JmxAutoConfiguration' of type [org.springframework.boot.autoconfigure.jmx.JmxAutoConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
15:19:05.776 [main] INFO  PostProcessorRegistrationDelegate$BeanPostProcessorChecker   - Bean 'mbeanServer' of type [com.sun.jmx.mbeanserver.JmxMBeanServer] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
15:19:05.786 [main] INFO  PostProcessorRegistrationDelegate$BeanPostProcessorChecker   - Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
15:19:05.787 [main] INFO  PostProcessorRegistrationDelegate$BeanPostProcessorChecker   - Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
15:19:05.935 [main] INFO  TomcatWebServer   - Tomcat initialized with port(s): 8080 (http)
15:19:05.940 [main] INFO  Http11NioProtocol   - Initializing ProtocolHandler ["http-nio-8080"]
15:19:05.941 [main] INFO  StandardService   - Starting service [Tomcat]
15:19:05.941 [main] INFO  StandardEngine   - Starting Servlet engine: [Apache Tomcat/9.0.38]
15:19:05.983 [main] INFO  [/]   - Initializing Spring embedded WebApplicationContext
15:19:05.983 [main] INFO  ServletWebServerApplicationContext   - Root WebApplicationContext: initialization completed in 711 ms
15:19:06.119 [main] INFO  ThreadPoolTaskExecutor   - Initializing ExecutorService 'applicationTaskExecutor'
15:19:06.377 [main] INFO  ThreadPoolTaskScheduler   - Initializing ExecutorService 'taskScheduler'
15:19:06.385 [main] INFO  SimpleFunctionRegistry   - Looking up function 'consume' with acceptedOutputTypes: []
15:19:06.402 [main] INFO  SimpleFunctionRegistry   - Looking up function 'consume' with acceptedOutputTypes: []
15:19:06.403 [main] INFO  SimpleFunctionRegistry   - Looking up function 'consume' with acceptedOutputTypes: []
15:19:06.418 [main] INFO  DirectWithAttributesChannel   - Channel 'application.input' has 1 subscriber(s).
15:19:06.421 [main] INFO  SimpleFunctionRegistry   - Looking up function 'consume' with acceptedOutputTypes: []
15:19:06.501 [main] INFO  IntegrationMBeanExporter   - Registering MessageChannel nullChannel
15:19:06.514 [main] INFO  IntegrationMBeanExporter   - Registering MessageChannel input
15:19:06.546 [main] INFO  IntegrationMBeanExporter   - Registering MessageChannel errorChannel
15:19:06.566 [main] INFO  IntegrationMBeanExporter   - Registering MessageHandler errorLogger
15:19:06.580 [main] INFO  EventDrivenConsumer   - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
15:19:06.580 [main] INFO  PublishSubscribeChannel   - Channel 'application.errorChannel' has 1 subscriber(s).
15:19:06.581 [main] INFO  EventDrivenConsumer   - started bean '_org.springframework.integration.errorLogger'
15:19:06.581 [main] INFO  DefaultBinderFactory   - Creating binder: kafkaBinder
15:19:06.672 [main] INFO  DefaultBinderFactory   - Caching the binder: kafkaBinder
15:19:06.672 [main] INFO  DefaultBinderFactory   - Retrieving cached binder: kafkaBinder
15:19:06.716 [main] INFO  AdminClientConfig   - AdminClientConfig values: 
    bootstrap.servers = [192.168.86.23:9092]
    client.dns.lookup = default
    client.id = 
    connections.max.idle.ms = 300000
    default.api.timeout.ms = 60000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2147483647
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS

15:19:06.740 [main] INFO  AppInfoParser   - Kafka version: 2.5.1
15:19:06.740 [main] INFO  AppInfoParser   - Kafka commitId: 0efa8fb0f4c73d92
15:19:06.740 [main] INFO  AppInfoParser   - Kafka startTimeMs: 1605302346740
15:19:06.979 [main] INFO  ConsumerConfig   - ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 100
    auto.offset.reset = earliest
    bootstrap.servers = [192.168.86.23:9092]
    check.crcs = true
    client.dns.lookup = default
    client.id = 
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = groupA
    group.instance.id = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

15:19:06.997 [main] INFO  AppInfoParser   - Kafka version: 2.5.1
15:19:06.997 [main] INFO  AppInfoParser   - Kafka commitId: 0efa8fb0f4c73d92
15:19:06.997 [main] INFO  AppInfoParser   - Kafka startTimeMs: 1605302346997
15:19:07.014 [main] INFO  Metadata   - [Consumer clientId=consumer-groupA-1, groupId=groupA] Cluster ID: WmLQLxSaRrqxST80G6w-5w
15:19:07.031 [main] INFO  BinderErrorChannel   - Channel 'students.groupA.errors' has 1 subscriber(s).
15:19:07.031 [main] INFO  BinderErrorChannel   - Channel 'students.groupA.errors' has 0 subscriber(s).
15:19:07.031 [main] INFO  BinderErrorChannel   - Channel 'students.groupA.errors' has 1 subscriber(s).
15:19:07.031 [main] INFO  BinderErrorChannel   - Channel 'students.groupA.errors' has 2 subscriber(s).
15:19:07.038 [main] INFO  ConsumerConfig   - ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 100
    auto.offset.reset = earliest
    bootstrap.servers = [192.168.86.23:9092]
    check.crcs = true
    client.dns.lookup = default
    client.id = 
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = groupA
    group.instance.id = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

15:19:07.040 [main] INFO  AppInfoParser   - Kafka version: 2.5.1
15:19:07.040 [main] INFO  AppInfoParser   - Kafka commitId: 0efa8fb0f4c73d92
15:19:07.040 [main] INFO  AppInfoParser   - Kafka startTimeMs: 1605302347040
15:19:07.041 [main] INFO  KafkaConsumer   - [Consumer clientId=consumer-groupA-2, groupId=groupA] Subscribed to topic(s): students
15:19:07.042 [main] INFO  ThreadPoolTaskScheduler   - Initializing ExecutorService
15:19:07.044 [main] INFO  KafkaMessageDrivenChannelAdapter   - started org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter@6232ffdb
15:19:07.045 [main] INFO  Http11NioProtocol   - Starting ProtocolHandler ["http-nio-8080"]
15:19:07.055 [main] INFO  TomcatWebServer   - Tomcat started on port(s): 8080 (http) with context path ''
15:19:07.059 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO  Metadata   - [Consumer clientId=consumer-groupA-2, groupId=groupA] Cluster ID: WmLQLxSaRrqxST80G6w-5w
15:19:07.060 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO  AbstractCoordinator   - [Consumer clientId=consumer-groupA-2, groupId=groupA] Discovered group coordinator DESKTOP-8T65SGP.lan:9092 (id: 2147483647 rack: null)
15:19:07.061 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO  AbstractCoordinator   - [Consumer clientId=consumer-groupA-2, groupId=groupA] (Re-)joining group
15:19:07.066 [main] INFO  Application   - Started Application in 2.093 seconds (JVM running for 2.429)
15:19:07.074 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO  AbstractCoordinator   - [Consumer clientId=consumer-groupA-2, groupId=groupA] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
15:19:07.074 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO  AbstractCoordinator   - [Consumer clientId=consumer-groupA-2, groupId=groupA] (Re-)joining group
15:19:07.079 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO  ConsumerCoordinator   - [Consumer clientId=consumer-groupA-2, groupId=groupA] Finished assignment for group at generation 73: {consumer-groupA-2-4e498561-d307-409b-bb51-09a57eff8b81=Assignment(partitions=[students-0])}
15:19:07.084 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO  AbstractCoordinator   - [Consumer clientId=consumer-groupA-2, groupId=groupA] Successfully joined group with generation 73
15:19:07.086 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO  ConsumerCoordinator   - [Consumer clientId=consumer-groupA-2, groupId=groupA] Adding newly assigned partitions: students-0
15:19:07.094 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO  ConsumerCoordinator   - [Consumer clientId=consumer-groupA-2, groupId=groupA] Setting offset for partition students-0 to the committed offset FetchPosition{offset=19, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[DESKTOP-8T65SGP.lan:9092 (id: 0 rack: null)], epoch=0}}
15:19:07.095 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO  KafkaMessageChannelBinder$1   - groupA: partitions assigned: [students-0]
15:19:45.344 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO  MessagingMethodInvokerHelper   - Overriding default instance of MessageHandlerMethodFactory with provided one.
Kafka message received---> student info received

#评论_1:这行代码执行完后,this.foo还是NULL。

#Comment_2: 我不认为这个配置是必要的,因为 ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG 是在 ConfigCustomizerTestConfig 中指定的。但是,如果从 Application.yml 中删除配置,则不会命中 Comment_1 周围的代码行

我想我的代码中缺少某些东西。任何建议将不胜感激。

更新: 我在日志的消费者配置中找不到 fooBean 或 MyConsumerInterceptor。

spring-cloud-stream spring-cloud-stream-binder-kafka
2个回答
2
投票

啊——你正在使用一个命名的活页夹来支持多活页夹;此技术仅适用于单个顶级活页夹。在这种情况下,

interceptor.classes
属性为空;这就是您需要将其添加到 YAML 的原因。这是多活页夹支持的一个已知问题。如果您只使用一个活页夹,请不要命名,只需在顶层定义即可。

(见下面的编辑)。

以下是命名活页夹的变通方法——覆盖活页夹工厂 bean。不幸的是,我需要复制一些私人代码,但这有效......

@SpringBootApplication
public class So64826496Application {

    public static void main(String[] args) {
        SpringApplication.run(So64826496Application.class, args);
    }

    public static class MyConsumerInterceptor implements ConsumerInterceptor<String, String> {

        private Foo foo;

        @Override
        public void configure(Map<String, ?> configs) {
            this.foo = (Foo) configs.get("fooBean");
            System.out.println(this.foo);
        }

        @Override
        public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
            this.foo.foo("consumer interceptor");
            return records;
        }

        @Override
        public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {

        }

        @Override
        public void close() {
        }
    }

    // BEGIN HACK TO INJECT CUSTOMIZER INTO BINDER

    @Autowired(required = false)
    private Collection<DefaultBinderFactory.Listener> binderFactoryListeners;

    @Bean
    public BinderFactory binderFactory(BinderTypeRegistry binderTypeRegistry,
            BindingServiceProperties bindingServiceProperties) {

        DefaultBinderFactory binderFactory = new DefaultBinderFactory(
                getBinderConfigurations(binderTypeRegistry, bindingServiceProperties),
                binderTypeRegistry) {

                    @Override
                    public synchronized <T> Binder<T, ?, ?> getBinder(String name,
                            Class<? extends T> bindingTargetType) {

                        Binder<T, ?, ?> binder = super.getBinder(name, bindingTargetType);
                        if (binder instanceof KafkaMessageChannelBinder) {
                            ((KafkaMessageChannelBinder) binder).setConsumerConfigCustomizer(consumerConfigCustomizer());
                        }
                        return binder;
                    }


        };
        binderFactory.setDefaultBinder(bindingServiceProperties.getDefaultBinder());
        binderFactory.setListeners(this.binderFactoryListeners);
        return binderFactory;
    }

    // Had to copy this because it's private in BindingServiceConfiguration
    private static Map<String, BinderConfiguration> getBinderConfigurations(
            BinderTypeRegistry binderTypeRegistry,
            BindingServiceProperties bindingServiceProperties) {

        Map<String, BinderConfiguration> binderConfigurations = new HashMap<>();
        Map<String, BinderProperties> declaredBinders = bindingServiceProperties
                .getBinders();
        boolean defaultCandidatesExist = false;
        Iterator<Map.Entry<String, BinderProperties>> binderPropertiesIterator = declaredBinders
                .entrySet().iterator();
        while (!defaultCandidatesExist && binderPropertiesIterator.hasNext()) {
            defaultCandidatesExist = binderPropertiesIterator.next().getValue()
                    .isDefaultCandidate();
        }
        List<String> existingBinderConfigurations = new ArrayList<>();
        for (Map.Entry<String, BinderProperties> binderEntry : declaredBinders
                .entrySet()) {
            BinderProperties binderProperties = binderEntry.getValue();
            if (binderTypeRegistry.get(binderEntry.getKey()) != null) {
                binderConfigurations.put(binderEntry.getKey(),
                        new BinderConfiguration(binderEntry.getKey(),
                                binderProperties.getEnvironment(),
                                binderProperties.isInheritEnvironment(),
                                binderProperties.isDefaultCandidate()));
                existingBinderConfigurations.add(binderEntry.getKey());
            }
            else {
                Assert.hasText(binderProperties.getType(),
                        "No 'type' property present for custom binder "
                                + binderEntry.getKey());
                binderConfigurations.put(binderEntry.getKey(),
                        new BinderConfiguration(binderProperties.getType(),
                                binderProperties.getEnvironment(),
                                binderProperties.isInheritEnvironment(),
                                binderProperties.isDefaultCandidate()));
                existingBinderConfigurations.add(binderEntry.getKey());
            }
        }
        for (Map.Entry<String, BinderConfiguration> configurationEntry : binderConfigurations
                .entrySet()) {
            if (configurationEntry.getValue().isDefaultCandidate()) {
                defaultCandidatesExist = true;
            }
        }
        if (!defaultCandidatesExist) {
            for (Map.Entry<String, BinderType> binderEntry : binderTypeRegistry.getAll()
                    .entrySet()) {
                if (!existingBinderConfigurations.contains(binderEntry.getKey())) {
                    binderConfigurations.put(binderEntry.getKey(),
                            new BinderConfiguration(binderEntry.getKey(), new HashMap<>(),
                                    true, "integration".equals(binderEntry.getKey()) ? false : true));
                }
            }
        }
        return binderConfigurations;
    }

    // END HACK

    @Bean
    public ConsumerConfigCustomizer consumerConfigCustomizer() {
        return new ConsumerConfigCustomizer() {
            @Override
            public void configure(Map<String, Object> consumerProperties) {
                consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
                        MyConsumerInterceptor.class.getName());
                consumerProperties.put("fooBean", foo());
            }
        };
    }

    @Bean
    public Foo foo() {
        return new Foo();
    }

    @Bean
    Consumer<String> consume() {
        return System.out::println;
    }

}

class Foo {

    void foo(String in) {
        System.out.println(in);
    }

}

和:

$ kafka-console-producer --bootstrap-server localhost:9092 --topic consume-in-0
>foo

结果:

consumer interceptor
foo

编辑

从3.1版本开始,现在有一个

BinderCustomizer
,可以用来调用
addConsumerConfigCustomizer
到特定的绑定器。 https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#binder-customizer

这是文档中的代码片段...

@Bean
public BinderCustomizer binderCustomizer() {
    return (binder, binderName) -> {
        if (binder instanceof KafkaMessageChannelBinder kafkaMessageChannelBinder) {
            kafkaMessageChannelBinder.setRebalanceListener(...);
        }
        else if (binder instanceof KStreamBinder) {
            ...
        }
        else if (binder instanceof RabbitMessageChannelBinder) {
            ...
        }
    };
}

0
投票

我在上面使用了Gary Russell答案。由于我花了一些时间来完成,所以我发布了一个较短的答案。
测试

  • 春季启动 2.7.2
  • 春云 3.1.3
  • Spring Cloud Kafka 活页夹 3.2.4
  • 科特林 1.5.31

创建一个配置类,它将依赖于 consumerProperties 以桥接缺少的本机注入支持。

@Configuration
class KafkaBinderCustomizer(private val foo: Foo) {
    @Bean
    fun consumerConfigCustomizer(): ConsumerConfigCustomizer =
        ConsumerConfigCustomizer { consumerProperties, bindingName, _ ->
            consumerProperties[Foo::class.simpleName!!.lowercase()] = foo
            logger.info("Injecting foo services to kafka {} interceptor", bindingName)
        }

}

通过覆盖配置方法在拦截器中设置依赖项

class MyConsumerInterceptor: ConsumerInterceptor<K, V> {
    private lateinit var foo: Foo

    otherMethods..
    override fun configure(configs: MutableMap<String, *>) {
        foo = configs[Foo::class.simpleName!!.lowercase()] as Foo
    }
}

对于 ProducerInterceptor,将 ConsumerConfigCustomizer 替换为 ProducerConfigCustomizer

© www.soinside.com 2019 - 2024. All rights reserved.