我正在编写我的第一个基于 Kafka 的项目,并遇到了一些集成问题。卡夫卡和卡夫卡连接。这是我的
docker-compose.yaml
version: '3.3'
services:
database:
image: 'postgres:16.2-alpine'
restart: always
shm_size: 128mb
ports:
- 5432:5432
env_file:
- ./database/.env
volumes:
- ./database/data:/var/lib/postgresql/data/
- ./database/init.sql:/docker-entrypoint-initdb.d/init.sql
networks:
priv-aeroflot-pilot-net:
ipv4_address: 172.16.254.3
depends_on:
- kafka
- kafka-connect
zookeeper:
image: zookeeper:3.7.0
ports:
- "2181:2181"
volumes:
# - ./kafka/zookeeper/data:/var/lib/zookeeper/data
- ./kafka/zookeeper/data:/data
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
networks:
priv-aeroflot-pilot-net:
ipv4_address: 172.16.254.5
kafka:
image: apache/kafka:3.7.0
ports:
- 9092:9092
volumes:
# - ./kafka/connect-plugins:/opt/kafka/connect-plugins
- ./kafka/config:/opt/kafka/custom-config
- ./kafka/logs:/var/lib/kafka/data
environment:
KAFKA_BROKER_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
networks:
priv-aeroflot-pilot-net:
ipv4_address: 172.16.254.2
depends_on:
- zookeeper
# kafka-connect:
# image: apache/kafka:3.7.0
# command: bash -c "/opt/kafka/bin/connect-standalone.sh /opt/kafka/config/connect-standalone.properties"
# volumes:
# - ./kafka/connect-plugins:/opt/kafka/connect-plugins
# - ./kafka/connect-standalone.properties:/opt/kafka/config/connect-standalone.properties
# - ./kafka/connect-standalone-worker.properties:/opt/kafka/config/connect-standalone-worker.properties
# networks:
# priv-aeroflot-pilot-net:
# ipv4_address: 172.16.254.4
# depends_on:
# - kafka
# adminer:
# image: adminer
# restart: always
# ports:
# - 8080:8080
# networks:
# priv-aeroflot-pilot-net:
# ipv4_address: 172.16.254.1
networks:
priv-aeroflot-pilot-net:
driver: bridge
ipam:
config:
- subnet: 172.16.254.0/28
Kafka connect 暂时被注释掉,以解决 Kafka Zookeeper 与 KRaft 冲突的第一个问题。
此时 Kafka 服务出现问题退出:
The kafka configuration file appears to be for a legacy cluster. Formatting is only supported for clusters in KRaft mode.
Zookeeper
将在 Apache Kafka 的下一个版本中删除,但在 3.7 版本中它只是被标记为已弃用。 Zookeeper
和 KRaft
是互斥的机制,不应该一起工作,但我既没有从 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
配置中省略 KRaft
,也没有省略与 KRaft
相关的配置,我收到了异常。
ConfigException: Missing required configuration `zookeeper.connect` which has no default value.
或者像上面的另一个一样。
一般情况是我希望将我的 Kafka 数据镜像到 postgres 中。我已经有了工作中的 Kafka 服务队列,其中生产者放置消息,消费者读取它们,除了 Kafka 队列之外,我现在还希望将此消息存储在 PostgreSQL 数据库中。我没有编写 PostgreSQL 接收器连接器并选择
Zookeeper
或 KRaft
编排模式。我避免使用像 Confluence 这样的专有解决方案,并在普通 Kafka 上尽可能保持纯粹,尽管完全开源的解决方案也可以。
您能帮我解决我当前的问题吗?为了简单起见,我同意
Zookeeper
并推迟 KRaft
集成以供将来使用。
我还没有在基于 docker 的
apache/kafka
图像上解决这个问题,但我已经通过迁移到 confluentinc/cn-kafka
docker 图像解决了我的任务。这是最后的docker-compose.yaml
:
version: '3.3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.2.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
networks:
priv-aeroflot-pilot-net:
ipv4_address: 172.16.254.2
kafka:
image: confluentinc/cp-kafka:6.2.0
hostname: kafka
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.16.254.3:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
# KAFKA_DEFAULT_REPLICATION_FACTOR: 1
# CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
# KAFKA_CONFLUENT_METADATA_TOPIC_REPLICATION_FACTOR: 1
# KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
networks:
priv-aeroflot-pilot-net:
ipv4_address: 172.16.254.3
connect:
image: confluentinc/cp-kafka-connect:6.2.0
container_name: connect
hostname: connect
depends_on:
- kafka
- postgres
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
CONNECT_REST_ADVERTISED_HOST_NAME: 172.16.254.4
CONNECT_GROUP_ID: connect-cluster
CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: _connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
# CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
# CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components
CONNECT_REPLICATION_FACTOR: 1
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CLASSPATH: /usr/share/java/postgresql-42.2.23.jar:/usr/share/confluent-hub-components/kafka-connect-jdbc-10.7.6.jar
volumes:
- ./kafka-connect/connectors:/usr/share/confluent-hub-components
- ./kafka-connect/config:/etc/kafka-connect
- ./kafka-connect/postgresql-42.2.23.jar:/usr/share/java/postgresql-42.2.23.jar
# - ./kafka-connect/jdbc-driver/postgresql-42.2.23.jar:/usr/share/java/postgresql-42.2.3.jar
networks:
priv-aeroflot-pilot-net:
ipv4_address: 172.16.254.4
postgres:
image: 'postgres:16.2-alpine'
container_name: postgres
restart: always
shm_size: 128mb
ports:
- 5432:5432
env_file:
- ./database/.env
volumes:
- ./database/data:/var/lib/postgresql/data/
- ./database/init.sql:/docker-entrypoint-initdb.d/init.sql
networks:
priv-aeroflot-pilot-net:
ipv4_address: 172.16.254.5
depends_on:
- kafka
networks:
priv-aeroflot-pilot-net:
driver: bridge
ipam:
config:
- subnet: 172.16.254.0/28
Confluence 在
Apache 2.0
许可下共享此图像,这对于当前目标来说是可以的,但是我仍然有兴趣在纯 Apache Kafka
上找到解决方案。