Spring cloud stream kafka活页夹连接到docker-compose kafka

问题描述 投票:0回答:1

我想使用春季云流kafka活页夹。我使用以下docker-compose文件启动了2个kafka代理和1个zookeeper。

version: '3'

services:
  kafka-0:
    image: confluentinc/cp-kafka:5.2.1
    container_name: kafka-0
    restart: always
    ports:
      - "9094:9092"
    expose:
      - "9094"
    environment:
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka-0:9094
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2182
      - KAFKA_ADVERTISED_HOST_NAME=kafka-0
      - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
    depends_on:
      - zookeeper

  kafka-1:
    image: confluentinc/cp-kafka:5.2.1
    container_name: kafka-1
    restart: always
    ports:
      - "9095:9093"
    expose:
      - "9095"
    environment:
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka-1:9095
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2182
      - KAFKA_ADVERTISED_HOST_NAME=kafka-1
      - KAFKA_ADVERTISED_HOST_NAME=127.0.0.1
      - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
    depends_on:
      - zookeeper

 zookeeper:
    image: confluentinc/cp-zookeeper:5.3.1
    container_name: zookeeper
    ports:
      - "2182:2181"
    expose:
      - "2182"
    environment:
      - ZOOKEEPER_CLIENT_PORT=2182

在我的spring boot应用程序中,我的属性文件是

spring.cloud.stream.kafka.binder.brokers=127.0.0.1:9094,127.0.0.1:9095

我的春季云流源类如下

@EnableScheduling
@EnableBinding(Source.class)
public class UsageDetailSender {

    @Autowired
    private Source source;

    private String[] users = {"Glenn", "Sabby", "Mark", "Janne", "Ilaya"};

    @Scheduled(fixedDelay = 1000)
    public void sendEvents() {
        UsageDetail usageDetail = new UsageDetail();
        usageDetail.setUserId(this.users[new Random().nextInt(5)]);
        usageDetail.setDuration(new Random().nextInt(300));
        usageDetail.setData(new Random().nextInt(700));
        this.source.output().send(MessageBuilder.withPayload(usageDetail).build());
    }
}

从AdminClientConfig值,看来我能够连接到2个kafka代理。

bootstrap.servers = [127.0.0.1:9094, 127.0.0.1:9095]
    client.id = 
    connections.max.idle.ms = 300000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 120000
    retries = 5
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS

但是,我收到此错误。我该如何解决?

org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.

2020-04-15 20:32:20.303  INFO 10384 --- [| adminclient-1] o.a.k.c.a.i.AdminMetadataManager         : [AdminClient clientId=adminclient-1] Metadata update failed

org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited.

2020-04-15 20:32:20.314 ERROR 10384 --- [           main] o.s.cloud.stream.binding.BindingService  : Failed to create producer binding; retrying in 30 seconds

org.springframework.cloud.stream.provisioning.ProvisioningException: Provisioning exception; nested exception is java.util.concurrent.TimeoutException
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:290) ~[spring-cloud-stream-binder-kafka-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:137) ~[spring-cloud-stream-binder-kafka-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:78) ~[spring-cloud-stream-binder-kafka-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:193) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:97) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:151) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binding.BindingService.doBindProducer(BindingService.java:268) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:243) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binding.BindableProxyFactory.createAndBindOutputs(BindableProxyFactory.java:287) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binding.OutputBindingLifecycle.doStartWithBindable(OutputBindingLifecycle.java:58) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) ~[na:na]
    at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:48) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binding.OutputBindingLifecycle.start(OutputBindingLifecycle.java:34) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:893) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:163) ~[spring-boot-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:552) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:142) ~[spring-boot-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775) ~[spring-boot-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) ~[spring-boot-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:316) ~[spring-boot-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260) ~[spring-boot-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248) ~[spring-boot-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at io.spring.dataflow.sample.usagedetailsender.UsageDetailSenderApplication.main(UsageDetailSenderApplication.java:10) ~[classes/:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:567) ~[na:na]
    at com.intellij.rt.execution.application.AppMainV2.main(AppMainV2.java:131) ~[idea_rt.jar:na]
Caused by: java.util.concurrent.TimeoutException: null
    at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:108) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:274) ~[kafka-clients-2.0.1.jar:na]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicAndPartitions(KafkaTopicProvisioner.java:323) ~[spring-cloud-stream-binder-kafka-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicIfNecessary(KafkaTopicProvisioner.java:299) ~[spring-cloud-stream-binder-kafka-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:281) ~[spring-cloud-stream-binder-kafka-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    ... 32 common frames omitted
apache-kafka spring-cloud spring-cloud-stream
1个回答
0
投票

您需要localhost:...上的监听器。

请参见here

© www.soinside.com 2019 - 2024. All rights reserved.