AWS MSK 事务支持

问题描述 投票:0回答:1

我正在尝试在 Lambda 函数内创建一个 Kafka Producer,并启用 Exactly-Once Delivery 支持以将消息推送到 MSK。

编辑:MSK IAM Auth 用于 Kafka 和客户端之间的安全协议

但是,即使(我认为)我已经正确设置了所有配置,Producer 仍然无法将消息写入 MSK。

生产者在调用 init_transactions() 时挂起,并循环输出以下调试消息:

%7|1708348719.300|TXNCOORD|lambda#producer-1| [thrd:main]: Unable to query for transaction coordinator: Coordinator query timer: No brokers available for Transactions (3 broker(s) known)

2024-02-19T14:18:39.338+01:00   %7|1708348719.338|CONNECT|lambda#producer-1| [thrd:TxnCoordinator]: TxnCoordinator: broker in state TRY_CONNECT connecting

2024-02-19T14:18:39.338+01:00   %7|1708348719.338|CONNECT|lambda#producer-1| [thrd:TxnCoordinator]: TxnCoordinator: broker has no address yet: postponing connect

2024-02-19T14:18:39.800+01:00   %7|1708348719.800|CONNECT|lambda#producer-1| [thrd:main]: Cluster connection already in progress: acquire ProducerID

2024-02-19T14:18:39.800+01:00   %7|1708348719.800|PIDBROKER|lambda#producer-1| [thrd:main]: No brokers available for Transactions (3 broker(s) known)

我尝试更改代理的数量(从 2 到 4 - 不起作用),尝试设置 transaction.state.log.replication.factortransaction.state.log.min.isr 的值,offsets.topic.replication.factor(即使将它们全部设置为 1 - 也没有帮助)。 此线程的建议也没有帮助Amazon MSK 默认配置和事务发布的问题

我的 AWS MSK 集群有以下配置和设置:

  • 跨 2 个可用区的 4 个经纪商
  • 卡夫卡2.8.1
  • 集群大小:kafka.t3.small(我检查过,kafka.m5.large 上也出现同样的情况)

集群配置:

transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
offsets.topic.replication.factor=3
min.insync.replicas=2
default.replication.factor=3
auto.create.topics.enable=true
num.io.threads=8
num.network.threads=2
num.partitions=1
num.replica.fetchers=2
replica.lag.time.max.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
zookeeper.session.timeout.ms=18000

生产者配置:

  • 使用 Confluence Kafka for Python 创建 Producer
{
  "client.id": "some_id",
  "acks": "all",
  "enable.idempotence": "true",
  "transactional.id": "123",
}

附注如果我不使用 ack、enable.idempotence 和 transactional.id - Producer 工作正常,但它违背了首先提出此问题的目的。

UPD:在挖掘 MSK 经纪商的日志后,似乎没有建立连接 - 不知道为什么,特别是因为事务性和非事务性生产者的身份验证方法是相同的,而对于非事务性生产者,连接是建立得很好..

apache-kafka kafka-producer-api aws-msk kafka-transactions-api
1个回答
0
投票

MSK 的 AWS IAM Auth 会以某种方式干扰调用 init_transactions(),因为不使用它而仅使用 PLAINTEXT 就可以正常工作。 不确定为什么这不能与 IAM Auth 一起使用,也许其他人可以提供建议。目前,此用例无法使用 AWS IAM Auth。

© www.soinside.com 2019 - 2024. All rights reserved.