我想在 Sprint Cloud Stream 3.0.9.RELEASE 中添加了一个 bean 到自定义的 ConsumerInterceptor 中,因为 ConsumerConfigCustomizer 被添加了。但是,注入的 bean 始终为 NULL。
Foo(要注入到 MyConsumerInterceptor 的依赖项)
public class Foo {
public void foo(String what) {
System.out.println(what);
}
}
MyConsumerInterceptor(自定义KafkaConsumerInterceptor)
public static class MyConsumerInterceptor implements ConsumerInterceptor<String, String> {
private Foo foo;
@Override
public void configure(Map<String, ?> configs) {
this.foo = (Foo) configs.get("fooBean"); #Comment_1
}
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
this.foo.foo("consumer interceptor");
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
}
@Override
public void close() {
}
}
@SpringBootApplication
public static class ConfigCustomizerTestConfig {
@Bean
public ConsumerConfigCustomizer consumerConfigCustomizer() {
return new ConsumerConfigCustomizer() {
@Override
public void configure(Map<String, Object> consumerProperties) {
consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName());
consumerProperties.put("fooBean", foo());
}
};
}
@Bean
public Foo foo() {
return new Foo();
}
}
Application.yml
spring:
cloud:
function:
definition: consume;
stream:
function:
bindings:
consume-in-0: input
bindings:
input:
destination: students
group: groupA
binder: kafkaBinder
binders:
kafkaBinder:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: <IPs>
依赖关系
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<version>3.0.9.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-function-context</artifactId>
<version>3.0.11.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>3.0.9.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<version>3.0.9.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-function-context</artifactId>
<version>3.0.11.RELEASE</version>
</dependency>
</dependencies>
日志:
5:19:05.732 [main] INFO DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
15:19:05.735 [main] INFO DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
15:19:05.737 [main] INFO DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
15:19:05.764 [main] INFO PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
15:19:05.768 [main] INFO PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'org.springframework.boot.autoconfigure.integration.IntegrationAutoConfiguration$IntegrationJmxConfiguration' of type [org.springframework.boot.autoconfigure.integration.IntegrationAutoConfiguration$IntegrationJmxConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
15:19:05.773 [main] INFO PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'org.springframework.boot.autoconfigure.jmx.JmxAutoConfiguration' of type [org.springframework.boot.autoconfigure.jmx.JmxAutoConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
15:19:05.776 [main] INFO PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'mbeanServer' of type [com.sun.jmx.mbeanserver.JmxMBeanServer] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
15:19:05.786 [main] INFO PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
15:19:05.787 [main] INFO PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
15:19:05.935 [main] INFO TomcatWebServer - Tomcat initialized with port(s): 8080 (http)
15:19:05.940 [main] INFO Http11NioProtocol - Initializing ProtocolHandler ["http-nio-8080"]
15:19:05.941 [main] INFO StandardService - Starting service [Tomcat]
15:19:05.941 [main] INFO StandardEngine - Starting Servlet engine: [Apache Tomcat/9.0.38]
15:19:05.983 [main] INFO [/] - Initializing Spring embedded WebApplicationContext
15:19:05.983 [main] INFO ServletWebServerApplicationContext - Root WebApplicationContext: initialization completed in 711 ms
15:19:06.119 [main] INFO ThreadPoolTaskExecutor - Initializing ExecutorService 'applicationTaskExecutor'
15:19:06.377 [main] INFO ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler'
15:19:06.385 [main] INFO SimpleFunctionRegistry - Looking up function 'consume' with acceptedOutputTypes: []
15:19:06.402 [main] INFO SimpleFunctionRegistry - Looking up function 'consume' with acceptedOutputTypes: []
15:19:06.403 [main] INFO SimpleFunctionRegistry - Looking up function 'consume' with acceptedOutputTypes: []
15:19:06.418 [main] INFO DirectWithAttributesChannel - Channel 'application.input' has 1 subscriber(s).
15:19:06.421 [main] INFO SimpleFunctionRegistry - Looking up function 'consume' with acceptedOutputTypes: []
15:19:06.501 [main] INFO IntegrationMBeanExporter - Registering MessageChannel nullChannel
15:19:06.514 [main] INFO IntegrationMBeanExporter - Registering MessageChannel input
15:19:06.546 [main] INFO IntegrationMBeanExporter - Registering MessageChannel errorChannel
15:19:06.566 [main] INFO IntegrationMBeanExporter - Registering MessageHandler errorLogger
15:19:06.580 [main] INFO EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
15:19:06.580 [main] INFO PublishSubscribeChannel - Channel 'application.errorChannel' has 1 subscriber(s).
15:19:06.581 [main] INFO EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger'
15:19:06.581 [main] INFO DefaultBinderFactory - Creating binder: kafkaBinder
15:19:06.672 [main] INFO DefaultBinderFactory - Caching the binder: kafkaBinder
15:19:06.672 [main] INFO DefaultBinderFactory - Retrieving cached binder: kafkaBinder
15:19:06.716 [main] INFO AdminClientConfig - AdminClientConfig values:
bootstrap.servers = [192.168.86.23:9092]
client.dns.lookup = default
client.id =
connections.max.idle.ms = 300000
default.api.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
15:19:06.740 [main] INFO AppInfoParser - Kafka version: 2.5.1
15:19:06.740 [main] INFO AppInfoParser - Kafka commitId: 0efa8fb0f4c73d92
15:19:06.740 [main] INFO AppInfoParser - Kafka startTimeMs: 1605302346740
15:19:06.979 [main] INFO ConsumerConfig - ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 100
auto.offset.reset = earliest
bootstrap.servers = [192.168.86.23:9092]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = groupA
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
15:19:06.997 [main] INFO AppInfoParser - Kafka version: 2.5.1
15:19:06.997 [main] INFO AppInfoParser - Kafka commitId: 0efa8fb0f4c73d92
15:19:06.997 [main] INFO AppInfoParser - Kafka startTimeMs: 1605302346997
15:19:07.014 [main] INFO Metadata - [Consumer clientId=consumer-groupA-1, groupId=groupA] Cluster ID: WmLQLxSaRrqxST80G6w-5w
15:19:07.031 [main] INFO BinderErrorChannel - Channel 'students.groupA.errors' has 1 subscriber(s).
15:19:07.031 [main] INFO BinderErrorChannel - Channel 'students.groupA.errors' has 0 subscriber(s).
15:19:07.031 [main] INFO BinderErrorChannel - Channel 'students.groupA.errors' has 1 subscriber(s).
15:19:07.031 [main] INFO BinderErrorChannel - Channel 'students.groupA.errors' has 2 subscriber(s).
15:19:07.038 [main] INFO ConsumerConfig - ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 100
auto.offset.reset = earliest
bootstrap.servers = [192.168.86.23:9092]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = groupA
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
15:19:07.040 [main] INFO AppInfoParser - Kafka version: 2.5.1
15:19:07.040 [main] INFO AppInfoParser - Kafka commitId: 0efa8fb0f4c73d92
15:19:07.040 [main] INFO AppInfoParser - Kafka startTimeMs: 1605302347040
15:19:07.041 [main] INFO KafkaConsumer - [Consumer clientId=consumer-groupA-2, groupId=groupA] Subscribed to topic(s): students
15:19:07.042 [main] INFO ThreadPoolTaskScheduler - Initializing ExecutorService
15:19:07.044 [main] INFO KafkaMessageDrivenChannelAdapter - started org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter@6232ffdb
15:19:07.045 [main] INFO Http11NioProtocol - Starting ProtocolHandler ["http-nio-8080"]
15:19:07.055 [main] INFO TomcatWebServer - Tomcat started on port(s): 8080 (http) with context path ''
15:19:07.059 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO Metadata - [Consumer clientId=consumer-groupA-2, groupId=groupA] Cluster ID: WmLQLxSaRrqxST80G6w-5w
15:19:07.060 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO AbstractCoordinator - [Consumer clientId=consumer-groupA-2, groupId=groupA] Discovered group coordinator DESKTOP-8T65SGP.lan:9092 (id: 2147483647 rack: null)
15:19:07.061 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO AbstractCoordinator - [Consumer clientId=consumer-groupA-2, groupId=groupA] (Re-)joining group
15:19:07.066 [main] INFO Application - Started Application in 2.093 seconds (JVM running for 2.429)
15:19:07.074 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO AbstractCoordinator - [Consumer clientId=consumer-groupA-2, groupId=groupA] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
15:19:07.074 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO AbstractCoordinator - [Consumer clientId=consumer-groupA-2, groupId=groupA] (Re-)joining group
15:19:07.079 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO ConsumerCoordinator - [Consumer clientId=consumer-groupA-2, groupId=groupA] Finished assignment for group at generation 73: {consumer-groupA-2-4e498561-d307-409b-bb51-09a57eff8b81=Assignment(partitions=[students-0])}
15:19:07.084 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO AbstractCoordinator - [Consumer clientId=consumer-groupA-2, groupId=groupA] Successfully joined group with generation 73
15:19:07.086 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO ConsumerCoordinator - [Consumer clientId=consumer-groupA-2, groupId=groupA] Adding newly assigned partitions: students-0
15:19:07.094 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO ConsumerCoordinator - [Consumer clientId=consumer-groupA-2, groupId=groupA] Setting offset for partition students-0 to the committed offset FetchPosition{offset=19, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[DESKTOP-8T65SGP.lan:9092 (id: 0 rack: null)], epoch=0}}
15:19:07.095 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO KafkaMessageChannelBinder$1 - groupA: partitions assigned: [students-0]
15:19:45.344 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one.
Kafka message received---> student info received
#评论_1:这行代码执行完后,this.foo还是NULL。
#Comment_2: 我不认为这个配置是必要的,因为 ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG 是在 ConfigCustomizerTestConfig 中指定的。但是,如果从 Application.yml 中删除配置,则不会命中 Comment_1 周围的代码行
我想我的代码中缺少某些东西。任何建议将不胜感激。
更新: 我在日志的消费者配置中找不到 fooBean 或 MyConsumerInterceptor。
啊——你正在使用一个命名的活页夹来支持多活页夹;此技术仅适用于单个顶级活页夹。在这种情况下,
interceptor.classes
属性为空;这就是您需要将其添加到 YAML 的原因。这是多活页夹支持的一个已知问题。如果您只使用一个活页夹,请不要命名,只需在顶层定义即可。
(见下面的编辑)。
以下是命名活页夹的变通方法——覆盖活页夹工厂 bean。不幸的是,我需要复制一些私人代码,但这有效......
@SpringBootApplication
public class So64826496Application {
public static void main(String[] args) {
SpringApplication.run(So64826496Application.class, args);
}
public static class MyConsumerInterceptor implements ConsumerInterceptor<String, String> {
private Foo foo;
@Override
public void configure(Map<String, ?> configs) {
this.foo = (Foo) configs.get("fooBean");
System.out.println(this.foo);
}
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
this.foo.foo("consumer interceptor");
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
}
@Override
public void close() {
}
}
// BEGIN HACK TO INJECT CUSTOMIZER INTO BINDER
@Autowired(required = false)
private Collection<DefaultBinderFactory.Listener> binderFactoryListeners;
@Bean
public BinderFactory binderFactory(BinderTypeRegistry binderTypeRegistry,
BindingServiceProperties bindingServiceProperties) {
DefaultBinderFactory binderFactory = new DefaultBinderFactory(
getBinderConfigurations(binderTypeRegistry, bindingServiceProperties),
binderTypeRegistry) {
@Override
public synchronized <T> Binder<T, ?, ?> getBinder(String name,
Class<? extends T> bindingTargetType) {
Binder<T, ?, ?> binder = super.getBinder(name, bindingTargetType);
if (binder instanceof KafkaMessageChannelBinder) {
((KafkaMessageChannelBinder) binder).setConsumerConfigCustomizer(consumerConfigCustomizer());
}
return binder;
}
};
binderFactory.setDefaultBinder(bindingServiceProperties.getDefaultBinder());
binderFactory.setListeners(this.binderFactoryListeners);
return binderFactory;
}
// Had to copy this because it's private in BindingServiceConfiguration
private static Map<String, BinderConfiguration> getBinderConfigurations(
BinderTypeRegistry binderTypeRegistry,
BindingServiceProperties bindingServiceProperties) {
Map<String, BinderConfiguration> binderConfigurations = new HashMap<>();
Map<String, BinderProperties> declaredBinders = bindingServiceProperties
.getBinders();
boolean defaultCandidatesExist = false;
Iterator<Map.Entry<String, BinderProperties>> binderPropertiesIterator = declaredBinders
.entrySet().iterator();
while (!defaultCandidatesExist && binderPropertiesIterator.hasNext()) {
defaultCandidatesExist = binderPropertiesIterator.next().getValue()
.isDefaultCandidate();
}
List<String> existingBinderConfigurations = new ArrayList<>();
for (Map.Entry<String, BinderProperties> binderEntry : declaredBinders
.entrySet()) {
BinderProperties binderProperties = binderEntry.getValue();
if (binderTypeRegistry.get(binderEntry.getKey()) != null) {
binderConfigurations.put(binderEntry.getKey(),
new BinderConfiguration(binderEntry.getKey(),
binderProperties.getEnvironment(),
binderProperties.isInheritEnvironment(),
binderProperties.isDefaultCandidate()));
existingBinderConfigurations.add(binderEntry.getKey());
}
else {
Assert.hasText(binderProperties.getType(),
"No 'type' property present for custom binder "
+ binderEntry.getKey());
binderConfigurations.put(binderEntry.getKey(),
new BinderConfiguration(binderProperties.getType(),
binderProperties.getEnvironment(),
binderProperties.isInheritEnvironment(),
binderProperties.isDefaultCandidate()));
existingBinderConfigurations.add(binderEntry.getKey());
}
}
for (Map.Entry<String, BinderConfiguration> configurationEntry : binderConfigurations
.entrySet()) {
if (configurationEntry.getValue().isDefaultCandidate()) {
defaultCandidatesExist = true;
}
}
if (!defaultCandidatesExist) {
for (Map.Entry<String, BinderType> binderEntry : binderTypeRegistry.getAll()
.entrySet()) {
if (!existingBinderConfigurations.contains(binderEntry.getKey())) {
binderConfigurations.put(binderEntry.getKey(),
new BinderConfiguration(binderEntry.getKey(), new HashMap<>(),
true, "integration".equals(binderEntry.getKey()) ? false : true));
}
}
}
return binderConfigurations;
}
// END HACK
@Bean
public ConsumerConfigCustomizer consumerConfigCustomizer() {
return new ConsumerConfigCustomizer() {
@Override
public void configure(Map<String, Object> consumerProperties) {
consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
MyConsumerInterceptor.class.getName());
consumerProperties.put("fooBean", foo());
}
};
}
@Bean
public Foo foo() {
return new Foo();
}
@Bean
Consumer<String> consume() {
return System.out::println;
}
}
class Foo {
void foo(String in) {
System.out.println(in);
}
}
和:
$ kafka-console-producer --bootstrap-server localhost:9092 --topic consume-in-0
>foo
结果:
consumer interceptor
foo
编辑
从3.1版本开始,现在有一个
BinderCustomizer
,可以用来调用addConsumerConfigCustomizer
到特定的绑定器。 https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#binder-customizer
这是文档中的代码片段...
@Bean
public BinderCustomizer binderCustomizer() {
return (binder, binderName) -> {
if (binder instanceof KafkaMessageChannelBinder kafkaMessageChannelBinder) {
kafkaMessageChannelBinder.setRebalanceListener(...);
}
else if (binder instanceof KStreamBinder) {
...
}
else if (binder instanceof RabbitMessageChannelBinder) {
...
}
};
}
我在上面使用了Gary Russell答案。由于我花了一些时间来完成,所以我发布了一个较短的答案。
测试
创建一个配置类,它将依赖于 consumerProperties 以桥接缺少的本机注入支持。
@Configuration
class KafkaBinderCustomizer(private val foo: Foo) {
@Bean
fun consumerConfigCustomizer(): ConsumerConfigCustomizer =
ConsumerConfigCustomizer { consumerProperties, bindingName, _ ->
consumerProperties[Foo::class.simpleName!!.lowercase()] = foo
logger.info("Injecting foo services to kafka {} interceptor", bindingName)
}
}
通过覆盖配置方法在拦截器中设置依赖项
class MyConsumerInterceptor: ConsumerInterceptor<K, V> {
private lateinit var foo: Foo
otherMethods..
override fun configure(configs: MutableMap<String, *>) {
foo = configs[Foo::class.simpleName!!.lowercase()] as Foo
}
}
对于 ProducerInterceptor,将 ConsumerConfigCustomizer 替换为 ProducerConfigCustomizer