带有 MSSQL Debezium 连接器的 AWS MSK Connect 因断开连接而失败

问题描述 投票:0回答:1

我正在尝试使用 AWS MSK Connect 设置 mssql debezium 连接器,但不断收到以下错误消息:

连接器错误日志:

[Worker-0a949760f6b805d4f] [2023-02-15 19:57:56,122] WARN [src-connector-014|task-0] [Consumer clientId=dlp.compcare.ccdemo-schemahistory, groupId=dlp.compcare.ccdemo- schemahistory] Bootstrap 代理 b-3.stuff.morestuff.c7.kafka.us-east-1.amazonaws.com:9098 (id: -2 rack: null) 断开连接 (org.apache.kafka.clients.NetworkClient:1079)

此错误持续发生一段时间然后我看到此错误:

org.apache.kafka.common.errors.TimeoutException:获取主题元数据时超时

在集群日志中,当我收到断开连接错误时,我会看到相应的错误:

[2023-02-15 20:08:21,627] INFO [SocketServer listenerType=ZK_BROKER, nodeId=3] 使用 /172.32.34.126 进行身份验证失败(SSL 握手失败)(org.apache.kafka.common.network.Selector)

我有一个 ec2 客户端,我已将其设置为连接到我的集群,并且能够使用 IAM 身份验证连接并针对集群运行命令。我已经设置了一个主题,并使用控制台生产者/消费者从该主题中生产和消费。我还验证了当连接器启动时它正在创建

__amazon_msk_connect_status_*
__amazon_msk_connect_offsets_*
主题。

我已经通过检查它附加到的弹性网络接口来验证日志中的 ip 是分配给我的连接器的 ip。

同样出于测试目的,我为它们运行的 SG 打开了来自 0.0.0.0/0 的所有流量,并确保 IAM 角色具有 msk*、msk-connect*、kafka* 和 s3*。

我还验证了 RDS 上启用了 CDC,并且它工作正常。我看到正在选择更改并将其添加到 CDC 表中。

我认为这个问题仍然与 IAM 身份验证有关,但不确定。 集群配置:

auto.create.topics.enable=true
delete.topic.enable=true

工人配置:

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider
config.providers=secretManager
config.providers.secretManager.param.aws.region=us-east-1
request.timeout.ms=90000
errors.log.enable=true
errors.log.include.messages=true

连接器配置:

connector.class=io.debezium.connector.sqlserver.SqlServerConnector
tasks.max=1
database.history.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
schema.include.list=dbo
database.history.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
database.history.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
database.history.consumer.security.protocol=SASL_SSL
database.instance=MSSQLSERVER
topic.prefix=dlp.compcare.ccdemo
schema.history.internal.kafka.topic=dlp.compcare.ccdemo.history
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
database.history.sasl.mechanism=AWS_MSK_IAM
database.encrypt=false
database.history.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
database.history.producer.sasl.mechanism=AWS_MSK_IAM
database.history.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
database.user=debezium
database.names=Intermodal_CCDEMO
database.history.producer.security.protocol=SASL_SSL
database.server.name=ccdemo_1
schema.history.internal.kafka.bootstrap.servers=b-1:9098
database.port=1433
database.hostname=my-mssql-rds.rds.amazonaws.com
database.history.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
database.password=${secretManager:dlp-compcare:dbpassword}
table.include.list=dbo.EquipmentSetup
database.history.security.protocol=SASL_SSL
database.history.consumer.sasl.mechanism=AWS_MSK_IAM

我能够执行相同的过程,但是使用 postgres rds 没有任何问题。

我已经尝试了我能想到的一切,所以任何帮助都将不胜感激!

我在设置集群/连接器时也参考了以下内容:

sql-server apache-kafka apache-kafka-connect debezium aws-msk
1个回答
0
投票

我在连接到 Aurora MYSQL DB 时遇到了同样的问题。

我使用的是最新版本的 Debezium v2.1.2

然而,当我使用以前的版本v1.7.0时使用相同的设置,它工作得很好。

我注意到版本 v1.7.0 接受以下参数:

database.history.kafka.bootstrap.servers=<masked>
database.history.kafka.topic=<masked>

可以在v1.7的Mysql示例中看到here.

但是最新版本抱怨以上两个并请求一个名为

topic.prefix
的新参数:

schema.history.internal.kafka.topic=<masked>
schema.history.internal.kafka.bootstrap.servers=<masked>
topic.prefix=<masked>

可以在 v2.1 的 Mysql 示例中看到here.

总而言之,即使在修复配置更改后,它也适用于 v1.7,但不适用于 v2.1.2。这意味着,要么是错误,要么我们缺少一些额外的配置。

如果我能弄清楚,我会及时通知您。

同时,你也可以试试以前的版本吗?

© www.soinside.com 2019 - 2024. All rights reserved.