Spring Cloud Kafka流DefaultPollableMessageSource不可用

问题描述 投票:0回答:1

您好,我正在测试Spring Cloud和Kafka流,但出现错误。

错误日志:

org.springframework.context.ApplicationContextException: Failed to start bean 'inputBindingLifecycle'; nested exception is java.lang.IllegalStateException: A default binder has been requested, but there are no binders available for 'org.springframework.cloud.stream.binder.DefaultPollableMessageSource' : , and no default binder has been set.
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:894) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:162) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:553) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:747) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at com.learn.SpringKafkaApplication.main(SpringKafkaApplication.java:30) ~[classes/:na]


Caused by: java.lang.IllegalStateException: A default binder has been requested, but there are no binders available for 'org.springframework.cloud.stream.binder.DefaultPollableMessageSource' : , and no default binder has been set.
    at org.springframework.cloud.stream.binder.DefaultBinderFactory.doGetBinder(DefaultBinderFactory.java:183) ~[spring-cloud-stream-3.0.3.RELEASE.jar:3.0.3.RELEASE]
    at org.springframework.cloud.stream.binder.DefaultBinderFactory.getBinder(DefaultBinderFactory.java:134) ~[spring-cloud-stream-3.0.3.RELEASE.jar:3.0.3.RELEASE]
    at org.springframework.cloud.stream.binding.BindingService.getBinder(BindingService.java:362) ~[spring-cloud-stream-3.0.3.RELEASE.jar:3.0.3.RELEASE]
    at org.springframework.cloud.stream.binding.BindingService.bindConsumer(BindingService.java:95) ~[spring-cloud-stream-3.0.3.RELEASE.jar:3.0.3.RELEASE]
    at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindInputs(AbstractBindableProxyFactory.java:112) ~[spring-cloud-stream-3.0.3.RELEASE.jar:3.0.3.RELEASE]
    at org.springframework.cloud.stream.binding.InputBindingLifecycle.doStartWithBindable(InputBindingLifecycle.java:58) ~[spring-cloud-stream-3.0.3.RELEASE.jar:3.0.3.RELEASE]
    at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) ~[na:na]
    at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:57) ~[spring-cloud-stream-3.0.3.RELEASE.jar:3.0.3.RELEASE]
    at org.springframework.cloud.stream.binding.InputBindingLifecycle.start(InputBindingLifecycle.java:34) ~[spring-cloud-stream-3.0.3.RELEASE.jar:3.0.3.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    ... 14 common frames omitted

这是我的代码:

1

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.binder.PollableMessageSource;

public interface SynSink {

    @Input("input")
    PollableMessageSource source();

}

2

import com.learn.bind.SynSink;
import com.learn.model.Employee;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.binder.PollableMessageSource;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;


@SpringBootApplication(scanBasePackages = "com.learn.*")
@EnableBinding({Source.class, SynSink.class})
@EnableScheduling
public class SpringKafkaApplication {

    @Autowired
    private PollableMessageSource messageSource;

    public static void main(String[] args) {
        SpringApplication.run(SpringKafkaApplication.class, args);
    }

  @Scheduled(fixedRate = 5000)
  public void getMessage() {
      messageSource.poll(m -> System.out.print(m.getPayload()), new 
          ParameterizedTypeReference<Employee>() {
      });

  }

}
apache-kafka-streams spring-cloud-stream
1个回答
0
投票

按预期,您的类路径中有多个活页夹:

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>

如果不使用,可以删除kafka-streams。

或在application.properties中指定默认的资料夹:

spring.cloud.stream.default-binder=kafka

有关更多信息,请参阅Spring Cloud Stream文档中的多重绑定部分。

© www.soinside.com 2019 - 2024. All rights reserved.