在 Docker/ECS 上运行带有 S3 Sink 的 Kafka Connect 不会生成 S3

问题描述 投票:0回答:1

我正在尝试在弹性容器服务上运行带有 s3 接收器的 Kafka Connect(从 Kafka 读取并写入 S3)。

Dockerfile

FROM confluentinc/cp-kafka-connect-base:latest
WORKDIR /etc/kafka-connect
COPY entrypoint.sh .
COPY kafka-connect.properties .
COPY s3-sink.properties .
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest
ENV CONNECT_PLUGIN_PATH=/usr/share/java,/etc/kafka-connect/jars
CMD /etc/kafka-connect/entrypoint.sh

入口点.sh

PROPERTIES_FILE="/etc/kafka-connect/kafka-connect.properties"
S3_SINK_PROPERTIES_FILE="/etc/kafka-connect/s3-sink.properties"
sed -i "s|KAFKA_USERNAME|$KAFKA_USERNAME|g" $PROPERTIES_FILE
sed -i "s|KAFKA_PASSWORD|$KAFKA_PASSWORD|g" $PROPERTIES_FILE

sed -i "s|AWS_ACCESS_KEY_ID|$AWS_ACCESS_KEY_ID|g" $CONSOLE_SINK_PROPERTIES_FILE
sed -i "s|AWS_SECRET_ACCESS_KEY|$AWS_SECRET_ACCESS_KEY|g" $CONSOLE_SINK_PROPERTIES_FILE

/usr/bin/connect-distributed $PROPERTIES_FILE $S3_SINK_PROPERTIES_FILE

kafka-connect.properties 包含以下键

name=abc
connector.class=io.confluent.connect.s3.S3SinkConnector
tasks.max=1
topics=topicName

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.protobuf.ProtobufConverter
group.id=myGroupId
bootstrap.servers=bootstrapServer:9091,...
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="KAFKA_USERNAME" password="KAFKA_PASSWORD";
sasl.mechanism=PLAIN

key.converter.schemas.enable=false
value.converter.schemas.enable=true

value.converter.basic.auth.credentials.source=USER_INFO
value.converter.value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicNameStrategy
schema.registry.basic.auth.credentials.source=USER_INFO
key.converter.basic.auth.credentials.source=USER_INFO

offset.storage.topic=test-offsets-topic
config.storage.topic=test-config-topic
status.storage.topic=test-status-topic

KAFKA_USERNAME 和 KAFKA_PASSWORD 通过环境变量在entrypoint.sh 中被替换。

s3-sink.properties

format.class=io.confluent.connect.s3.format.json.JsonFormat
s3.bucket.name=bucket-name
s3.region=eu-west-2
s3.part.size=5242880
aws.access.key.id=AWS_ACCESS_KEY_ID
aws.secret.access.key=AWS_SECRET_ACCESS_KEY
storage.class=io.confluent.connect.s3.storage.S3Storage

同样,AWS_ACCESS_KEY_ID 和 AWS_SECRET_ACCESS_KEY 也通过环境变量替换。

但是,当连接运行时,尽管该主题中有大量流量,但我没有看到任何内容生成到 s3 存储桶中。我究竟做错了什么? CloudWatch 日志中没有错误,只需查看此信息消息

[2024-01-09 23:10:27,086] INFO These configurations '[connector.class, config.storage.topic, tasks.max, topics, group.id, status.storage.topic, value.converter.value.subject.name.strategy, value.converter.basic.auth.credentials.source, key.converter.schemas.enable, schema.registry.basic.auth.credentials.source, name, value.converter.schemas.enable, offset.storage.topic, value.converter, key.converter, key.converter.basic.auth.credentials.source]' were supplied but are not used yet. (org.apache.kafka.clients.admin.AdminClientConfig:378)```
docker amazon-s3 apache-kafka apache-kafka-connect
1个回答
0
投票

这对我有用。 Kafka Consumer 属性中缺少某些配置。这些属性需要为 Consumer 和 Sink 连接器单独定义。一旦 Kafka Connect 成功启动,接收器连接器也需要启动/注册。

连接分布式自定义.properties

group.id=producer-my-group-id
bootstrap.servers=KAFKA_BOOTSTRAP_SERVERS
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="KAFKA_USERNAME" password="KAFKA_PASSWORD";
sasl.mechanism=PLAIN

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.protobuf.ProtobufConverter

offset.storage.topic=test-offsets-topic
config.storage.topic=test-config-topic
status.storage.topic=test-status-topic

plugin.path=/usr/share/confluent-hub-components

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="KAFKA_USERNAME" password="KAFKA_PASSWORD";

Dockerfile

FROM confluentinc/cp-kafka-connect-base:7.5.3

WORKDIR /etc/kafka

COPY entrypoint.sh .
COPY connect-distributed-custom.properties .
COPY s3-sink.json .


# Expose the REST API port
EXPOSE 8083:8083
EXPOSE 9092:9092

CMD /etc/kafka/entrypoint.sh

s3-sink.json

{
  "name": "s3-sink-test",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "1",
    "topics": "topicName",
    "s3.region": "eu-west-2",
    "s3.bucket.name": "test-kafka-connect",
    "s3.part.size": "5242880",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "name": "s3-sink-test",
    "flush.size": "100",
    "topics.dir": "kafka-topics-5",
    "consumer.override.auto.offset.reset": "earliest",
    "behavior.on.null.values": "ignore",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",

    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "true",

    "basic.auth.user.info": "SCHEMA_AUTH",
    "schema.registry.basic.auth.user.info": "SCHEMA_AUTH",
    "key.converter.basic.auth.user.info": "SCHEMA_AUTH",
    "value.converter.basic.auth.user.info": "SCHEMA_AUTH",

    "schema.registry.url": "SCHEMA_REGISTRY",
    "value.converter.schema.registry.url": "SCHEMA_REGISTRY",

    "basic.auth.credentials.source": "USER_INFO",
    "schema.registry.basic.auth.credentials.source": "USER_INFO",
    "key.converter.basic.auth.credentials.source": "USER_INFO",
    "value.converter.basic.auth.credentials.source": "USER_INFO",


    "value.converter.value.subject.name.strategy":  "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",
    "errors.log.include.messages": "true",
    "errors.deadletterqueue.topic.name":"deadLetterQueueTopic",
    "schema.compatibility": "NONE",
    "errors.log.enable": "true",
    "errors.tolerance": "all",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
    "locale": "en",
    "timezone": "UTC",
    "timestamp.extractor": "Record",
    "partition.duration.ms": "300000",
    "rotate.schedule.interval.ms": "300000"
  }
}

入口点.sh

#!/bin/bash

PROPERTIES_FILE="/etc/kafka/connect-distributed-custom.properties"

S3_SINK_PROPERTIES_FILE="/etc/kafka/s3-sink.json"

sed -i "s|KAFKA_BOOTSTRAP_SERVERS|$KAFKA_BOOTSTRAP_SERVERS|g" $PROPERTIES_FILE
sed -i "s|KAFKA_USERNAME|$KAFKA_USERNAME|g" $PROPERTIES_FILE
sed -i "s|KAFKA_PASSWORD|$KAFKA_PASSWORD|g" $PROPERTIES_FILE

sed -i "s|SCHEMA_REGISTRY|$SCHEMA_REGISTRY|g" $S3_SINK_PROPERTIES_FILE

sed -i "s|SCHEMA_AUTH|$SCHEMA_AUTH|g" $S3_SINK_PROPERTIES_FILE

confluent-hub install --no-prompt confluentinc/kafka-connect-s3:10.5.7

/usr/bin/connect-distributed $PROPERTIES_FILE &
sleep 60


echo "===Creating Connector===="

curl -X POST -H "Content-Type: application/json" --data @"$S3_SINK_PROPERTIES_FILE" http://localhost:8083/connectors

echo "===Done Creating Connector===="

tail -f /dev/null
© www.soinside.com 2019 - 2024. All rights reserved.