Kafka 连接集群属性

问题描述 投票:0回答:1

我对卡夫卡还很陌生,我仍在学习一些知识。我有一个带有 Kafka ConnectEcs-Ec2 集群,其结构如下:

├── bitbucket-pipelines.yml
├── docker-compose.yml
├── Dockerfile
├── env.sh 
├── jmx-kafka-connect-config.yml
├── password.properties
├── rest-jaas.properties
├── task-definition.json
├── worker.properties

在我的 Dockerfile 中,我运行

COPY
来替换我正在处理的
properties
文件,并运行
CMD connect-standalone /etc/kafka-connect/worker.properties
来启动。

我有两个问题。

  1. 在一些教程中,设置有很大不同。它们通过环境变量传递。我的工作方式正确还是我需要纠正?
  2. 我正在运行
    connect-standalone
    ,但我已经读到要扩展规模,我需要与
    connect-distributed
    合作。我该如何调整以让至少 3 名工作人员致力于同一主题?这将是获得 H.A. 的一种方式

docker-compose.yml

version: "3"

services:
  kafka-connect:
    build:
      context: .
    ports:
      - "8083:8083"
      - "8085:8085"

Dockerfile

FROM confluentinc/cp-server-connect:7.4.0

RUN confluent-hub install debezium/debezium-connector-mysql:2.1.4 --no-prompt
RUN confluent-hub install confluentinc/kafka-connect-elasticsearch:14.0.8 --no-prompt

USER root

RUN microdnf -y install vim

COPY jmx-kafka-connect-config.yml /etc/kafka-connect/jmx-kafka-connect-config.yml
COPY jmx_prometheus_javaagent-0.18.0.jar /etc/kafka-connect/jars/jmx_prometheus_javaagent.jar
COPY worker.properties /etc/kafka-connect/worker.properties
COPY password.properties /etc/kafka-connect/password.properties
COPY rest-jaas.properties /etc/kafka-connect/rest-jaas.properties

ENV KAFKA_JMX_PORT=1976
ENV KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka-connect/rest-jaas.properties -javaagent:/etc/kafka-connect/jars/jmx_prometheus_javaagent.jar=8085:/etc/kafka-connect/jmx-kafka-connect-config.yml"

EXPOSE 8083
EXPOSE 8085

RUN sed -i 's|log4j.rootLogger=INFO|log4j.rootLogger=ERROR|g' /etc/kafka/connect-log4j.properties

CMD connect-standalone /etc/kafka-connect/worker.properties

env.sh - 通过 Bitbucket-pipeline 运行

#!/bin/sh
touch worker.properties

echo bootstrap.servers=$KAFKA_BOOTSTRAP_SERVERS >> worker.properties
echo confluent.license=$CONFLUENT_LICENSE >> worker.properties
echo connector.client.config.override.policy=All >> worker.properties
echo consumer.sasl.mechanism=PLAIN >> worker.properties
echo consumer.sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username="\"$KAFKA_SASL_USERNAME\"" password="\"$KAFKA_SASL_PASSWORD\"";" >> worker.properties
echo consumer.security.protocol=SASL_SSL >> worker.properties
echo producer.sasl.mechanism=PLAIN >> worker.properties
echo producer.sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username="\"$KAFKA_SASL_USERNAME\"" password="\"$KAFKA_SASL_PASSWORD\"";" >> worker.properties
echo producer.security.protocol=SASL_SSL >> worker.properties
echo sasl.mechanism=PLAIN >> worker.properties
echo sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username="\"$KAFKA_SASL_USERNAME\"" password="\"$KAFKA_SASL_PASSWORD\"";" >> worker.properties
echo security.protocol=SASL_SSL >> worker.properties
echo group.id=sv-connect >> worker.properties
echo config.storage.topic=sv-connect-configs >> worker.properties
echo offset.storage.topic=sv-connect-offsets >> worker.properties
echo status.storage.topic=sv-connect-status >> worker.properties
echo key.converter=org.apache.kafka.connect.json.JsonConverter >> worker.properties
echo value.converter=org.apache.kafka.connect.json.JsonConverter >> worker.properties
echo internal.key.converter=org.apache.kafka.connect.json.JsonConverter >> worker.properties
echo internal.value.converter=org.apache.kafka.connect.json.JsonConverter >> worker.properties
echo offset.storage.file.filename=/tmp/offset.txt >> worker.properties
echo rest.extension.classes=org.apache.kafka.connect.rest.basic.auth.extension.BasicAuthSecurityRestExtension >> worker.properties
echo log4j.root.loglevel=WARN >> worker.properties
echo log4j.loggers=org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR >> worker.properties
echo config.storage.replication.factor=3 >> worker.properties
echo offset.storage.replication.factor=3 >> worker.properties
echo status.storage.replication.factor=3 >> worker.properties
echo plugin.path=/usr/share/java,/etc/kafka-connect/jars,/usr/share/confluent-hub-components >> worker.properties
echo kafka.jmx.port=1976 >> worker.properties
echo kafka.opts=-javaagent:/etc/kafka-connect/jars/jmx_prometheus_javaagent.jar=8085:/etc/kafka-connect/jmx-kafka-connect-config.yml >> worker.properties

touch password.properties
echo "$KAFKA_SASL_USERNAME: $KAFKA_SASL_PASSWORD" >> password.properties
docker apache-kafka amazon-ecs apache-kafka-connect debezium
1个回答
0
投票

它们是通过环境变量传递的

顺便说一句,您可以使用

kafka-connect-base
图像,因为您使用的连接器不需要许可证,但是,它们使用 Jinja 模板来定义配置。

确实没有理由复制自己的值,尤其是如果它们都是静态值(不依赖于其他环境变量,例如可以在运行时确定的 AWS 区域)。

https://github.com/confluenceinc/kafka-images/blob/master/kafka-connect-base/include/etc/confluence/docker/kafka-connect.properties.template

我正在运行独立连接

图像不运行此。它默认正确为

connect-distributed.sh
,所以不要覆盖 ENNTYPOINT / CMD,就可以了。只需扩展 ECS 副本即可。

注意:您还可以使用 EKS 并使用 KafkaConnect k8s 资源运行 Strimzi,它提供了更多有用的功能,包括内置的 JMX 监控和警报功能。

© www.soinside.com 2019 - 2024. All rights reserved.