Springboot Kafka Consumer无法维持与kafka集群代理的连接

问题描述 投票:0回答:1

我在 docker-compose 中使用以下配置部署了 3 节点 kafka 集群

Kafka集群:

services:

  kafka1:
    image: confluentinc/cp-kafka:latest
    hostname: kafka1
    container_name: kafka1
    ports:
      - "9092:9092"
    environment:
      CLUSTER_ID: "7bes6xCzSMeUR-RRas3gMQ"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 5000
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_JMX_PORT: 9997
      KAFKA_JMX_OPTS: '-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka1 -Dcom.sun.management.jmxremote.rmi.port=9997'
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_NODE_ID: 1
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:29093,2@kafka2:29093,3@kafka3:29093'
      KAFKA_LISTENERS: 'PLAINTEXT://kafka1:29092,CONTROLLER://kafka1:29093,,PLAINTEXT_HOST://0.0.0.0:9092'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
    volumes:
      - kafka1-data:/var/lib/kafka/data
      - ./scripts/update_run.sh:/tmp/update_run.sh
    command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo \"ERROR: Did you forget the update_run.sh file that came with this docker-compose.yml file?\" && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'"

  kafka2:
    image: confluentinc/cp-kafka:latest
    hostname: kafka2
    container_name: kafka2
    ports:
      - "9093:9092"
    environment:
      CLUSTER_ID: "7bes6xCzSMeUR-RRas3gMQ"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka2:29092,PLAINTEXT_HOST://localhost:9093'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 5000
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_JMX_PORT: 9997
      KAFKA_JMX_OPTS: '-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka2 -Dcom.sun.management.jmxremote.rmi.port=9998'
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_NODE_ID: 2
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:29093,2@kafka2:29093,3@kafka3:29093'
      KAFKA_LISTENERS: 'PLAINTEXT://kafka2:29092,CONTROLLER://kafka2:29093,PLAINTEXT_HOST://0.0.0.0:9093'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
    volumes:
      - kafka2-data:/var/lib/kafka/data
      - ./scripts/update_run.sh:/tmp/update_run.sh
    command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo \"ERROR: Did you forget the update_run.sh file that came with this docker-compose.yml file?\" && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'"

  kafka3:
    image: confluentinc/cp-kafka:latest
    hostname: kafka3
    container_name: kafka3
    ports:
      - "9094:9092"
    environment:
      CLUSTER_ID: "7bes6xCzSMeUR-RRas3gMQ"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka3:29092,PLAINTEXT_HOST://localhost:9094'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 5000
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_JMX_PORT: 9997
      KAFKA_JMX_OPTS: '-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka3 -Dcom.sun.management.jmxremote.rmi.port=9999'
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_NODE_ID: 3
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:29093,2@kafka2:29093,3@kafka3:29093'
      KAFKA_LISTENERS: 'PLAINTEXT://kafka3:29092,CONTROLLER://kafka3:29093,PLAINTEXT_HOST://0.0.0.0:9094'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
    volumes:
      - kafka3-data:/var/lib/kafka/data
      - ./scripts/update_run.sh:/tmp/update_run.sh
    command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo \"ERROR: Did you forget the update_run.sh file that came with this docker-compose.yml file?\" && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'"

我的 Spring Boot 应用程序无法维持连接并生成以下日志

2024-03-29T20:00:23.440-06:00  [Consumer clientId=consumer-transaction-group-2, groupId=transaction-group] Discovered group coordinator localhost:9093 (id: 2147483645 rack: null)
2024-03-29T20:00:23.440-06:00  [Consumer clientId=consumer-transaction-group-1, groupId=transaction-group] Discovered group coordinator localhost:9093 (id: 2147483645 rack: null)
2024-03-29T20:00:23.440-06:00  [Consumer clientId=consumer-transaction-group-2, groupId=transaction-group] Group coordinator localhost:9093 (id: 2147483645 rack: null) is unavailable or invalid due to cause: coordinator unavailable. isDisconnected: false. Rediscovery will be attempted.
2024-03-29T20:00:23.440-06:00  [Consumer clientId=consumer-transaction-group-1, groupId=transaction-group] Group coordinator localhost:9093 (id: 2147483645 rack: null) is unavailable or invalid due to cause: coordinator unavailable. isDisconnected: false. Rediscovery will be attempted.
2024-03-29T20:00:23.440-06:00  [Consumer clientId=consumer-transaction-group-2, groupId=transaction-group] Requesting disconnect from last known coordinator localhost:9093 (id: 2147483645 rack: null)
2024-03-29T20:00:23.440-06:00  [Consumer clientId=consumer-transaction-group-1, groupId=transaction-group] Requesting disconnect from last known coordinator localhost:9093 (id: 2147483645 rack: null)
2024-03-29T20:00:23.547-06:00  [Consumer clientId=consumer-transaction-group-2, groupId=transaction-group] Discovered group coordinator localhost:9093 (id: 2147483645 rack: null)

这是我应用程序中的 kafka 属性。

kafka:
    bootstrap-servers: localhost:9092,localhost:9093,localhost:9094
    listener:
      ack-mode: manual
    properties:
      schema.registry.url: http://localhost:8185/
    consumer:
      group-id: transaction-group
      enable-auto-commit: false
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer
      transaction-id-prefix: tx- 
      properties:
        enable-idempotence: true

是什么原因导致连接断开?当我部署单节点 kafka 集群时,这个问题就消失了。

spring-boot apache-kafka docker-compose
1个回答
0
投票

在您的 Compose 端口映射中,例如,您有

9093:9092
,但容器内没有为 9092 配置
listener

对于单节点集群来说它消失了,因为端口都是正确的。

© www.soinside.com 2019 - 2024. All rights reserved.