当我配置并启用
processing.guarantee=exactly_once_v2
时,我在运行 Spring Boot Kafka Streams 应用程序时遇到问题。当我启动我的应用程序时,它最终崩溃(我的所有流线程都关闭),并且记录了以下异常:
[Producer clientId=my-application-c170d5b4-ffe4-4734-a3e4-41b068b21060-StreamThread-2-producer, transactionalId=my-application-c170d5b4-ffe4-4734-a3e4-41b068b21060-2] Transiting to fatal error state due to org.apache.kafka.common.errors.TransactionalIdAuthorizationException: Transactional Id authorization failed.
org.apache.kafka.streams.errors.StreamsException: Error encountered trying to initialize transactions [stream-thread [main]]
at org.apache.kafka.streams.processor.internals.StreamsProducer.initTransaction(StreamsProducer.java:169)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.initialize(RecordCollectorImpl.java:93)
at org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:229)
at org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:436)
at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:849)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:731)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:555)
Caused by: org.apache.kafka.common.errors.TransactionalIdAuthorizationException: Transactional Id authorization failed.
我的配置如下:
spring:
kafka:
producer:
bootstrap-servers: <urls>
properties:
security.protocol: SASL_SSL
sasl.mechanism: AWS_MSK_IAM
sasl.jaas.config: software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class: software.amazon.msk.auth.iam.IAMClientCallbackHandler
streams:
application-id: my-application
bootstrap-servers: <urls>
replicationFactor: 3
properties:
acks: all
retries: 3
processing:
guarantee: exactly_once_v2
num:
stream:
threads: 3
当我删除exactly_once_v2保证时,它能够正常工作。
在 AWS 中,我使用以下策略声明:
{
"Effect": "Allow",
"Action": [
"kafka-cluster:Connect",
"kafka-cluster:AlterCluster",
"kafka-cluster:DescribeCluster",
"kafka-cluster:DescribeClusterDynamicConfiguration",
"kafka-cluster:ReadData",
"kafka-cluster:WriteData",
"kafka-cluster:*Topic*",
"kafka-cluster:WriteDataIdempotently
"kafka-cluster:DescribeTransactionalId",
"kafka-cluster:AlterTransactionalId",
"kafka-cluster:AlterGroup",
"kafka-cluster:DescribeGroup"
],
"Resource": [
"arn:aws:kafka:<arn>:cluster/my-cluster/*",
"arn:aws:kafka:<arn>:topic/my-cluster/*",
"arn:aws:kafka:<arn>:transactional-id/my-cluster/*",
"arn:aws:kafka:<arn>:group/my-cluster/*",
]
}
我也尝试过:
{
"Effect": "Allow",
"Action": [
"kafka-cluster:*"
],
"Resource": [
"arn:aws:kafka:<arn>:*/my-cluster/*"
]
}
但这也不起作用。根据汇合文档中定义的 RBAC 角色绑定,我认为我在集群中设置了正确的权限,但事实似乎并非如此。
是否有人对潜在的 IAM 政策声明或我可能缺少的其他配置有任何见解,以使 EOS 适用于我的 kafka 流应用程序?
你能解决这个问题吗?我现在遇到了同样的问题。