我试图在与Kafka docker相同的主机上运行Kafka Connect S3 Sink docker.我想根据挂钟时间对输出进行分区。然而,我的Kafka Connect启动失败,出现以下错误。
ERROR [Worker clientId=connect-1, groupId=kkconnect] Uncaught exception in herder work thread, exiting: (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
org.apache.kafka.common.config.ConfigException: Invalid value io.confluent.connect.storage.partitioner.TimeBasedPartitioner for configuration partitioner.class: 类io.confluent.connect.storage.partitioner.TimeBasedPartitioner无法找到。
这是我用来启动那个docker的命令。
docker run -d \
--name=my-kkc \
--net=host \
--restart unless-stopped \
--log-driver=awslogs \
--log-opt awslogs-stream=kk-cn-1 \
--log-opt awslogs-region=ap-southeast-2 \
--log-opt awslogs-group=kafka-cn-1 \
-e CONNECT_BOOTSTRAP_SERVERS=xxx.xxx.xxx.xxx:9092 \
-e CONNECT_REST_ADVERTISED_HOST_NAME="localhost" \
-e CONNECT_REST_PORT=28082 \
-e CONNECT_GROUP_ID="kkconnect" \
-e CONNECT_CONFIG_STORAGE_TOPIC="kkconnect-config" \
-e CONNECT_OFFSET_STORAGE_TOPIC="kkconnect-offsets" \
-e CONNECT_STATUS_STORAGE_TOPIC="kkconnect-status" \
-e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.converters.ByteArrayConverter" \
-e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.converters.ByteArrayConverter" \
-e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.converters.ByteArrayConverter" \
-e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.converters.ByteArrayConverter" \
-e CONNECT_SCHEMA_COMPATIBILITY=NONE \
-e CONNECT_PLUGIN_PATH=/usr/share/java \
-e CONNECT_NAME="s3-sink" \
-e CONNECT_CONNECTOR_CLASS="io.confluent.connect.s3.S3SinkConnector" \
-e CONNECT_TASKS_MAX=1 \
-e CONNECT_TOPICS="mytopic" \
-e CONNECT_ROTATE_INTERVAL_MS=60000 \
-e CONNECT_FLUSH_SIZE=3 \
-e CONNECT_S3_BUCKET_NAME="xxx-xxx" \
-e CONNECT_S3_PART_SIZE=5242880 \
-e CONNECT_STORAGE_CLASS="io.confluent.connect.s3.storage.S3Storage" \
-e CONNECT_FORMAT_CLASS="io.confluent.connect.s3.format.bytearray.ByteArrayFormat" \
-e CONNECT_FORMAT_BYTEARRAY_EXTENSION=json \
-e CONNECT_PARTITIONER_CLASS="io.confluent.connect.storage.partitioner.TimeBasedPartitioner" \
-e CONNECT_PARTITION_DURATION_MS=60000 \
-e CONNECT_PATH_FORMAT="'kafka/mytopic/year'=YYYY/'month'=MM/'day'=dd/'hour'=HH" \
-e CONNECT_TIMESTAMP_EXTRACTOR=Wallclock \
-e CONNECT_S3_COMPRESSION_TYPE=gzip \
confluentinc/cp-kafka-connect:5.3.0
能否请你帮忙?
谢谢,问候,Averell
正如 @OneCricketeer 在评论中提到的,你需要在指定所有其他参数之后,再指定 CONNECT_PLUGIN_PATH
的POST请求中。
所以,当你用上面的所有变量旋转了容器,并且包括了 CONNECT_PLUGIN_PATH
然后运行该命令。
curl -d @path/to/your-config.json -H "Content-Type: application/json" -H "Accept: application/json" -X POST http://localhost:28082/connectors
Where path/to/your-config.json
会是这样的。
{
"name": "s3-sink",
"config": {
"name": "s3-sink",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "1",
"topics": "mytopic",
// and so on...
}
}