我对卡夫卡还很陌生,我仍在学习一些知识。我有一个带有 Kafka Connect 的 Ecs-Ec2 集群,其结构如下:
├── bitbucket-pipelines.yml
├── docker-compose.yml
├── Dockerfile
├── env.sh
├── jmx-kafka-connect-config.yml
├── password.properties
├── rest-jaas.properties
├── task-definition.json
├── worker.properties
在我的 Dockerfile 中,我运行
COPY
来替换我正在处理的 properties
文件,并运行 CMD connect-standalone /etc/kafka-connect/worker.properties
来启动。
我有两个问题。
connect-standalone
,但我已经读到要扩展规模,我需要与 connect-distributed
合作。我该如何调整以让至少 3 名工作人员致力于同一主题?这将是获得 H.A. 的一种方式docker-compose.yml
version: "3"
services:
kafka-connect:
build:
context: .
ports:
- "8083:8083"
- "8085:8085"
Dockerfile
FROM confluentinc/cp-server-connect:7.4.0
RUN confluent-hub install debezium/debezium-connector-mysql:2.1.4 --no-prompt
RUN confluent-hub install confluentinc/kafka-connect-elasticsearch:14.0.8 --no-prompt
USER root
RUN microdnf -y install vim
COPY jmx-kafka-connect-config.yml /etc/kafka-connect/jmx-kafka-connect-config.yml
COPY jmx_prometheus_javaagent-0.18.0.jar /etc/kafka-connect/jars/jmx_prometheus_javaagent.jar
COPY worker.properties /etc/kafka-connect/worker.properties
COPY password.properties /etc/kafka-connect/password.properties
COPY rest-jaas.properties /etc/kafka-connect/rest-jaas.properties
ENV KAFKA_JMX_PORT=1976
ENV KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka-connect/rest-jaas.properties -javaagent:/etc/kafka-connect/jars/jmx_prometheus_javaagent.jar=8085:/etc/kafka-connect/jmx-kafka-connect-config.yml"
EXPOSE 8083
EXPOSE 8085
RUN sed -i 's|log4j.rootLogger=INFO|log4j.rootLogger=ERROR|g' /etc/kafka/connect-log4j.properties
CMD connect-standalone /etc/kafka-connect/worker.properties
env.sh - 通过 Bitbucket-pipeline 运行
#!/bin/sh
touch worker.properties
echo bootstrap.servers=$KAFKA_BOOTSTRAP_SERVERS >> worker.properties
echo confluent.license=$CONFLUENT_LICENSE >> worker.properties
echo connector.client.config.override.policy=All >> worker.properties
echo consumer.sasl.mechanism=PLAIN >> worker.properties
echo consumer.sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username="\"$KAFKA_SASL_USERNAME\"" password="\"$KAFKA_SASL_PASSWORD\"";" >> worker.properties
echo consumer.security.protocol=SASL_SSL >> worker.properties
echo producer.sasl.mechanism=PLAIN >> worker.properties
echo producer.sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username="\"$KAFKA_SASL_USERNAME\"" password="\"$KAFKA_SASL_PASSWORD\"";" >> worker.properties
echo producer.security.protocol=SASL_SSL >> worker.properties
echo sasl.mechanism=PLAIN >> worker.properties
echo sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username="\"$KAFKA_SASL_USERNAME\"" password="\"$KAFKA_SASL_PASSWORD\"";" >> worker.properties
echo security.protocol=SASL_SSL >> worker.properties
echo group.id=sv-connect >> worker.properties
echo config.storage.topic=sv-connect-configs >> worker.properties
echo offset.storage.topic=sv-connect-offsets >> worker.properties
echo status.storage.topic=sv-connect-status >> worker.properties
echo key.converter=org.apache.kafka.connect.json.JsonConverter >> worker.properties
echo value.converter=org.apache.kafka.connect.json.JsonConverter >> worker.properties
echo internal.key.converter=org.apache.kafka.connect.json.JsonConverter >> worker.properties
echo internal.value.converter=org.apache.kafka.connect.json.JsonConverter >> worker.properties
echo offset.storage.file.filename=/tmp/offset.txt >> worker.properties
echo rest.extension.classes=org.apache.kafka.connect.rest.basic.auth.extension.BasicAuthSecurityRestExtension >> worker.properties
echo log4j.root.loglevel=WARN >> worker.properties
echo log4j.loggers=org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR >> worker.properties
echo config.storage.replication.factor=3 >> worker.properties
echo offset.storage.replication.factor=3 >> worker.properties
echo status.storage.replication.factor=3 >> worker.properties
echo plugin.path=/usr/share/java,/etc/kafka-connect/jars,/usr/share/confluent-hub-components >> worker.properties
echo kafka.jmx.port=1976 >> worker.properties
echo kafka.opts=-javaagent:/etc/kafka-connect/jars/jmx_prometheus_javaagent.jar=8085:/etc/kafka-connect/jmx-kafka-connect-config.yml >> worker.properties
touch password.properties
echo "$KAFKA_SASL_USERNAME: $KAFKA_SASL_PASSWORD" >> password.properties
它们是通过环境变量传递的
顺便说一句,您可以使用
kafka-connect-base
图像,因为您使用的连接器不需要许可证,但是,它们使用 Jinja 模板来定义配置。
确实没有理由复制自己的值,尤其是如果它们都是静态值(不依赖于其他环境变量,例如可以在运行时确定的 AWS 区域)。
我正在运行独立连接
图像不运行此。它默认正确为
connect-distributed.sh
,所以不要覆盖 ENNTYPOINT / CMD,就可以了。只需扩展 ECS 副本即可。
注意:您还可以使用 EKS 并使用 KafkaConnect k8s 资源运行 Strimzi,它提供了更多有用的功能,包括内置的 JMX 监控和警报功能。