我正在实现Kafka KStream和GlobalKTable与Spring云流(Hoxton.SR2)联接。我将处理器声明为Function bean。这是我的功能-
@Bean
@Bean
public BiFunction<KStream<String, GenericDeviceEvent>, GlobalKTable<String, GenericAssetEvent>, KStream<String, GenericDeviceEvent>> enrich() {
return (stream, table) -> stream.leftJoin(table, (streamEventKey, streamEvent) -> {
log.debug("Stream Event for enrichment - {}", streamEvent);
return joinKeyProvider.getJoinKey(streamEvent);
}, (streamEvent, tableEvent) -> {
log.debug("Matched table event - {}", tableEvent);
if (tableEvent == null) {
log.warn("There is no joining Asset in the table, hence returning Device event without enriching..");
return streamEvent;
}
GenericDeviceEvent result = streamEnricher.enrichStream(streamEvent, tableEvent);
log.info("Enriched result - {}", streamEvent);
return result;
});
}
这是我的YAML-
spring:
application:
name: test
profiles:
include:
# - streamBinding
- deviceAssetEnricher
cloud:
stream:
function:
definition: enrich
bindings:
enrich-in-0:
destination: device
enrich-in-1:
destination: assets
output:
destination: target
kafka:
streams:
binder:
application-id: enricher
configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
schema.registry.url: http://localhost:8085
specific.avro.reader: true
这是我的pom-
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.6.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<groupId>my.stream</groupId>
<artifactId>kstream-globalktable-enricher</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kstream-globalktable-enricher</name>
<description>Data enricher</description>
<properties>
<java.version>1.8</java.version>
<avro.version>1.9.2</avro.version>
<spring-cloud.version>Hoxton.SR2</spring-cloud.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<confluent.version>5.3.1</confluent.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-schema</artifactId>
<version>2.2.0.RELEASE</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-streams-avro-serde</artifactId>
<version>5.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.9.2</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${confluent.version}</version>
</dependency>
在启动时,将执行对消费者和生产者的所有绑定。它还消耗KStream,而GobalKTable执行联接并生成所需的结果,但是
ERROR [test,c95491b4791eb8fd,c95491b4791eb8fd,false] 28840 --- [ask-scheduler-4] o.s.cloud.stream.binding.BindingService : Failed to create consumer binding; retrying in 30 seconds
java.lang.ClassCastException: class com.sun.proxy.$Proxy159 cannot be cast to class org.springframework.messaging.MessageChannel (com.sun.proxy.$Proxy159 and org.springframework.messaging.MessageChannel are in unnamed module of loader 'app')
at org.springframework.cloud.stream.test.binder.TestSupportBinder.bindConsumer(TestSupportBinder.java:66) ~[spring-cloud-stream-test-support-3.0.2.RELEASE.jar:3.0.2.RELEASE]
at org.springframework.cloud.stream.binding.BindingService.lambda$rescheduleConsumerBinding$0(BindingService.java:194) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
at org.springframework.cloud.sleuth.instrument.async.TraceRunnable.run(TraceRunnable.java:67) ~[spring-cloud-sleuth-core-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:835) ~[na:na]
经过一些研究,我添加了以下属性以禁用这些错误-
spring.autoconfigure.exclude: org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration
现在启动时,出现以下错误,应用程序将无法启动-
java.lang.IllegalArgumentException: Trying to prepareConsumerBinding public abstract void org.apache.kafka.streams.kstream.KStream.to(java.lang.String,org.apache.kafka.streams.kstream.Produced) but no delegate has been set.
at org.springframework.util.Assert.notNull(Assert.java:198) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.cloud.stream.binder.kafka.streams.KStreamBoundElementFactory$KStreamWrapperHandler.invoke(KStreamBoundElementFactory.java:133) ~[spring-cloud-stream-binder-kafka-streams-3.0.2.RELEASE.jar:3.0.2.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) ~[spring-aop-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at com.sun.proxy.$Proxy130.to(Unknown Source) ~[na:na]
at org.springframework.cloud.stream.binder.kafka.streams.KStreamBinder.to(KStreamBinder.java:163) ~[spring-cloud-stream-binder-kafka-streams-3.0.2.RELEASE.jar:3.0.2.RELEASE]
at org.springframework.cloud.stream.binder.kafka.streams.KStreamBinder.doBindProducer(KStreamBinder.java:133) ~[spring-cloud-stream-binder-kafka-streams-3.0.2.RELEASE.jar:3.0.2.RELEASE]
at org.springframework.cloud.stream.binder.kafka.streams.KStreamBinder.doBindProducer(KStreamBinder.java:52) ~[spring-cloud-stream-binder-kafka-streams-3.0.2.RELEASE.jar:3.0.2.RELEASE]
at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:152) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
at org.springframework.cloud.stream.binding.BindingService.doBindProducer(BindingService.java:296) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:271) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindOutputs(AbstractBindableProxyFactory.java:136) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
at org.springframework.cloud.stream.binding.OutputBindingLifecycle.doStartWithBindable(OutputBindingLifecycle.java:58) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) ~[na:na]
at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:57) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
at org.springframework.cloud.stream.binding.OutputBindingLifecycle.start(OutputBindingLifecycle.java:34) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) ~[spring-context-5.2.5.RELEASE.jar:5.2.5.RELEASE]
因此,我通过引入测试类来观察完全不同的行为。
由于我对此一无所知,因此不胜感激。
请忽略此帖子。我解决了这个问题。这是Function Bean的声明。