[KStream不适用于Spring Cloud Stream Kafka作为功能Bean

问题描述 投票:0回答:1

我正在实现Kafka KStream和GlobalKTable与Spring云流(Hoxton.SR2)联接。我将处理器声明为Function bean。这是我的功能-

 @Bean
     @Bean
    public BiFunction<KStream<String, GenericDeviceEvent>, GlobalKTable<String, GenericAssetEvent>, KStream<String, GenericDeviceEvent>> enrich() {
    return (stream, table) -> stream.leftJoin(table, (streamEventKey, streamEvent) -> {
        log.debug("Stream Event for enrichment -  {}", streamEvent);
        return joinKeyProvider.getJoinKey(streamEvent);
    }, (streamEvent, tableEvent) -> {
        log.debug("Matched table event - {}", tableEvent);
        if (tableEvent == null) {
        log.warn("There is no joining Asset in the table, hence returning Device event without enriching..");
        return streamEvent;
        }
        GenericDeviceEvent result = streamEnricher.enrichStream(streamEvent, tableEvent);
        log.info("Enriched result - {}", streamEvent);
        return result;
    });
    }

这是我的YAML-

spring:
  application:
    name: test
  profiles:
    include:
#    - streamBinding
    - deviceAssetEnricher
  cloud:
    stream:
      function:
        definition: enrich
        bindings:
          enrich-in-0:
            destination: device
          enrich-in-1:
            destination: assets
          output:
            destination: target
      kafka:
        streams:
          binder:
            application-id: enricher
            configuration:
              default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
              default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde      
              schema.registry.url: http://localhost:8085  
              specific.avro.reader: true

这是我的pom-

    <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.6.RELEASE</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>
    <groupId>my.stream</groupId>
    <artifactId>kstream-globalktable-enricher</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kstream-globalktable-enricher</name>
    <description>Data enricher</description>

    <properties>
        <java.version>1.8</java.version>
        <avro.version>1.9.2</avro.version>
        <spring-cloud.version>Hoxton.SR2</spring-cloud.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <confluent.version>5.3.1</confluent.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams-test-utils</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-schema</artifactId>
            <version>2.2.0.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-streams-avro-serde</artifactId>
            <version>5.2.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.9.2</version>
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>${confluent.version}</version>
        </dependency>

在启动时,将执行对消费者和生产者的所有绑定。它还消耗KStream,而GobalKTable执行联接并生成所需的结果,但是

  1. 即使我可以看到日志消息,结果也永远不会发布到有关Kafka的主题上-[-StreamThread-1] oakclients.producer.KafkaProducer:[Producer clientId = enricher-6173f85c-adf0-4e66-bdd9-9ec6e3f6b04d- StreamThread-1-producer]刷新生产器中的累积记录。
  2. 每隔30秒就会收到以下异常-
ERROR [test,c95491b4791eb8fd,c95491b4791eb8fd,false] 28840 --- [ask-scheduler-4] o.s.cloud.stream.binding.BindingService  : Failed to create consumer binding; retrying in 30 seconds

java.lang.ClassCastException: class com.sun.proxy.$Proxy159 cannot be cast to class org.springframework.messaging.MessageChannel (com.sun.proxy.$Proxy159 and org.springframework.messaging.MessageChannel are in unnamed module of loader 'app')
    at org.springframework.cloud.stream.test.binder.TestSupportBinder.bindConsumer(TestSupportBinder.java:66) ~[spring-cloud-stream-test-support-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binding.BindingService.lambda$rescheduleConsumerBinding$0(BindingService.java:194) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.sleuth.instrument.async.TraceRunnable.run(TraceRunnable.java:67) ~[spring-cloud-sleuth-core-2.2.0.RELEASE.jar:2.2.0.RELEASE]
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:835) ~[na:na]

经过一些研究,我添加了以下属性以禁用这些错误-

spring.autoconfigure.exclude: org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration

现在启动时,出现以下错误,应用程序将无法启动-

java.lang.IllegalArgumentException: Trying to prepareConsumerBinding public abstract void org.apache.kafka.streams.kstream.KStream.to(java.lang.String,org.apache.kafka.streams.kstream.Produced)  but no delegate has been set.
    at org.springframework.util.Assert.notNull(Assert.java:198) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.streams.KStreamBoundElementFactory$KStreamWrapperHandler.invoke(KStreamBoundElementFactory.java:133) ~[spring-cloud-stream-binder-kafka-streams-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) ~[spring-aop-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at com.sun.proxy.$Proxy130.to(Unknown Source) ~[na:na]
    at org.springframework.cloud.stream.binder.kafka.streams.KStreamBinder.to(KStreamBinder.java:163) ~[spring-cloud-stream-binder-kafka-streams-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.streams.KStreamBinder.doBindProducer(KStreamBinder.java:133) ~[spring-cloud-stream-binder-kafka-streams-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.streams.KStreamBinder.doBindProducer(KStreamBinder.java:52) ~[spring-cloud-stream-binder-kafka-streams-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:152) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binding.BindingService.doBindProducer(BindingService.java:296) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:271) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindOutputs(AbstractBindableProxyFactory.java:136) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binding.OutputBindingLifecycle.doStartWithBindable(OutputBindingLifecycle.java:58) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) ~[na:na]
    at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:57) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binding.OutputBindingLifecycle.start(OutputBindingLifecycle.java:34) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) ~[spring-context-5.2.5.RELEASE.jar:5.2.5.RELEASE]

因此,我通过引入测试类来观察完全不同的行为。

由于我对此一无所知,因此不胜感激。

apache-kafka-streams spring-cloud-stream spring-cloud-stream-binder-kafka
1个回答
0
投票

请忽略此帖子。我解决了这个问题。这是Function Bean的声明。

© www.soinside.com 2019 - 2024. All rights reserved.