AWS MSK Connect MirrorMaker2 - 刷新失败

问题描述 投票:0回答:1

在 MSK Connect (2.7.1) 上使用 MirrorMaker2,源连接器抛出以下错误:

[Worker-0d8c5a576b5ef6e99] [2023-12-22 16:01:00,771] ERROR [msk-dev-conv-mm2-sourceconnector|task-1|offsets] WorkerSourceTask{id=msk-dev-conv-mm2-sourceconnector-1} Failed to flush, timed out while waiting for producer to flush outstanding 3 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:509)
[Worker-0d8c5a576b5ef6e99] [2023-12-22 16:01:00,771] ERROR [msk-dev-conv-mm2-sourceconnector|task-1|offsets] WorkerSourceTask{id=msk-dev-conv-mm2-sourceconnector-1} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:116)

集群设置:

allow.everyone.if.no.acl.found = false
auto.create.topics.enable = true
delete.topic.enable = true
log.cleaner.delete.retention.ms = 86400000
log.cleanup.policy = compact
log.retention.hours = -1
message.max.bytes = 5242940
min.insync.replicas = 2
unclean.leader.election.enable = false

Mirror Maker 源连接器设置:

connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector
errors.log.include.messages=false
replication.factor=3
source.cluster.ssl.truststore.location=${s3import:ca-central-1:msk-manual-msk-configurations/test.truststore.jks}
target.cluster.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required awsDebugCreds=true;
sync.topic.acls.enabled=false
tasks.max=16
source.cluster.alias=
sync.topic.configs.interval.seconds=20
target.cluster.security.protocol=SASL_SSL
replication.policy.separator=
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
errors.log.enable=true
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
refresh.groups.interval.seconds=20
refresh.topics.interval.seconds=20
offset-syncs.topic.replication.factor=3
ssl.protocol=TLS
consumer.group.id=mm2-dev
target.cluster.sasl.mechanism=AWS_MSK_IAM
topics=axis.*|ausmle-test
target.cluster.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
producer.enable.idempotence=true
source.cluster.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username='123' password='123';
source.cluster.bootstrap.servers=10.0.0.1:19093,10.0.0.1:19094,10.0.0.1:19095
source.cluster.sasl.mechanism=SCRAM-SHA-512
target.cluster.alias=
target.cluster.bootstrap.servers=b-1.msk.123.c3.kafka.ca-central-1.amazonaws.com:9098,b-2.msk.123.c3.kafka.ca-central-1.amazonaws.com:9098,b-3.msk.123.c3.kafka.ca-central-1.amazonaws.com:9098
source.cluster.ssl.truststore.password=123
sync.topic.configs.enabled=true
source.cluster.security.protocol=SASL_SSL
source.cluster.ssl.endpoint.identification.algorithm=
apache-kafka-connect aws-msk apache-kafka-mirrormaker aws-msk-connect
1个回答
0
投票

该问题是由于当目标集群具有集群设置时尝试镜像没有密钥的主题

log.cleanup.policy="compact"

尽管在 sourceConnector 配置中设置了

sync.topic.configs.enabled=true
,源主题的清理策略并未同步到目标主题。因此,目标主题将使用集群默认清理策略“紧凑”创建,并且当 MirrorMaker 尝试在没有密钥的情况下刷新消息时,它会失败。这可能与https://issues.apache.org/jira/browse/KAFKA-9459

有关

尝试使用命令行客户端将没有键的消息推送到压缩主题会产生以下错误:

ERROR Error when sending message to topic test-compact with key: null, value: 1 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.InvalidRecordException: One or more records have been rejected
© www.soinside.com 2019 - 2024. All rights reserved.