Kafka Connect S3 - partitioner.class - 找不到基于时间的Partitioner。

问题描述 投票:1回答:1

我试图在与Kafka docker相同的主机上运行Kafka Connect S3 Sink docker.我想根据挂钟时间对输出进行分区。然而,我的Kafka Connect启动失败,出现以下错误。

ERROR [Worker clientId=connect-1, groupId=kkconnect] Uncaught exception in herder work thread, exiting: (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

org.apache.kafka.common.config.ConfigException: Invalid value io.confluent.connect.storage.partitioner.TimeBasedPartitioner for configuration partitioner.class: 类io.confluent.connect.storage.partitioner.TimeBasedPartitioner无法找到。

这是我用来启动那个docker的命令。

docker run -d \
  --name=my-kkc \
  --net=host \
  --restart unless-stopped \
  --log-driver=awslogs \
  --log-opt awslogs-stream=kk-cn-1 \
  --log-opt awslogs-region=ap-southeast-2 \
  --log-opt awslogs-group=kafka-cn-1 \
  -e CONNECT_BOOTSTRAP_SERVERS=xxx.xxx.xxx.xxx:9092 \
  -e CONNECT_REST_ADVERTISED_HOST_NAME="localhost" \
  -e CONNECT_REST_PORT=28082 \
  -e CONNECT_GROUP_ID="kkconnect" \
  -e CONNECT_CONFIG_STORAGE_TOPIC="kkconnect-config" \
  -e CONNECT_OFFSET_STORAGE_TOPIC="kkconnect-offsets" \
  -e CONNECT_STATUS_STORAGE_TOPIC="kkconnect-status" \
  -e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.converters.ByteArrayConverter" \
  -e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.converters.ByteArrayConverter" \
  -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.converters.ByteArrayConverter" \
  -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.converters.ByteArrayConverter" \
  -e CONNECT_SCHEMA_COMPATIBILITY=NONE \
  -e CONNECT_PLUGIN_PATH=/usr/share/java \
  -e CONNECT_NAME="s3-sink" \
  -e CONNECT_CONNECTOR_CLASS="io.confluent.connect.s3.S3SinkConnector" \
  -e CONNECT_TASKS_MAX=1 \
  -e CONNECT_TOPICS="mytopic" \
  -e CONNECT_ROTATE_INTERVAL_MS=60000 \
  -e CONNECT_FLUSH_SIZE=3 \
  -e CONNECT_S3_BUCKET_NAME="xxx-xxx" \
  -e CONNECT_S3_PART_SIZE=5242880 \
  -e CONNECT_STORAGE_CLASS="io.confluent.connect.s3.storage.S3Storage" \
  -e CONNECT_FORMAT_CLASS="io.confluent.connect.s3.format.bytearray.ByteArrayFormat" \
  -e CONNECT_FORMAT_BYTEARRAY_EXTENSION=json \
  -e CONNECT_PARTITIONER_CLASS="io.confluent.connect.storage.partitioner.TimeBasedPartitioner" \
  -e CONNECT_PARTITION_DURATION_MS=60000 \
  -e CONNECT_PATH_FORMAT="'kafka/mytopic/year'=YYYY/'month'=MM/'day'=dd/'hour'=HH" \
  -e CONNECT_TIMESTAMP_EXTRACTOR=Wallclock \
  -e CONNECT_S3_COMPRESSION_TYPE=gzip \
  confluentinc/cp-kafka-connect:5.3.0

能否请你帮忙?

谢谢,问候,Averell

docker amazon-s3 apache-kafka-connect partition
1个回答
0
投票

正如 @OneCricketeer 在评论中提到的,你需要在指定所有其他参数之后,再指定 CONNECT_PLUGIN_PATH 的POST请求中。

所以,当你用上面的所有变量旋转了容器,并且包括了 CONNECT_PLUGIN_PATH 然后运行该命令。

curl -d @path/to/your-config.json -H "Content-Type: application/json" -H "Accept: application/json" -X POST http://localhost:28082/connectors

Where path/to/your-config.json 会是这样的。

{
    "name": "s3-sink",
    "config": {
        "name": "s3-sink",
        "connector.class": "io.confluent.connect.s3.S3SinkConnector",
        "tasks.max": "1",
        "topics": "mytopic",
        // and so on...
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.