我怎样才能获得具有exactly_once_v2保证的Kafka Streams应用程序以与AWS MSK一起使用

问题描述 投票:0回答:1

当我配置并启用

processing.guarantee=exactly_once_v2
时,我在运行 Spring Boot Kafka Streams 应用程序时遇到问题。当我启动我的应用程序时,它最终崩溃(我的所有流线程都关闭),并且记录了以下异常:

[Producer clientId=my-application-c170d5b4-ffe4-4734-a3e4-41b068b21060-StreamThread-2-producer, transactionalId=my-application-c170d5b4-ffe4-4734-a3e4-41b068b21060-2] Transiting to fatal error state due to org.apache.kafka.common.errors.TransactionalIdAuthorizationException: Transactional Id authorization failed. 

org.apache.kafka.streams.errors.StreamsException: Error encountered trying to initialize transactions [stream-thread [main]]
at org.apache.kafka.streams.processor.internals.StreamsProducer.initTransaction(StreamsProducer.java:169)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.initialize(RecordCollectorImpl.java:93)
at org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:229)
at org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:436)
at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:849)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:731)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:555)
Caused by: org.apache.kafka.common.errors.TransactionalIdAuthorizationException: Transactional Id authorization failed.

我的配置如下:

        spring:
          kafka:
            producer:
              bootstrap-servers: <urls>
            properties:
              security.protocol: SASL_SSL
              sasl.mechanism: AWS_MSK_IAM
              sasl.jaas.config: software.amazon.msk.auth.iam.IAMLoginModule required;
              sasl.client.callback.handler.class: software.amazon.msk.auth.iam.IAMClientCallbackHandler
            streams:
              application-id: my-application
              bootstrap-servers: <urls>
              replicationFactor: 3
              properties:
                acks: all
                retries: 3
                processing:
                  guarantee: exactly_once_v2
                num:
                  stream:
                    threads: 3

当我删除exactly_once_v2保证时,它能够正常工作。

在 AWS 中,我使用以下策略声明:

{
    "Effect": "Allow",
    "Action": [
        "kafka-cluster:Connect",
        "kafka-cluster:AlterCluster",
        "kafka-cluster:DescribeCluster",
        "kafka-cluster:DescribeClusterDynamicConfiguration",
        "kafka-cluster:ReadData",
        "kafka-cluster:WriteData",
        "kafka-cluster:*Topic*",
        "kafka-cluster:WriteDataIdempotently
        "kafka-cluster:DescribeTransactionalId",
        "kafka-cluster:AlterTransactionalId",
        "kafka-cluster:AlterGroup",
        "kafka-cluster:DescribeGroup"
    ],
    "Resource": [
        "arn:aws:kafka:<arn>:cluster/my-cluster/*",
        "arn:aws:kafka:<arn>:topic/my-cluster/*",
        "arn:aws:kafka:<arn>:transactional-id/my-cluster/*",
        "arn:aws:kafka:<arn>:group/my-cluster/*",
    ]
}

我也尝试过:

{
           "Effect": "Allow",
           "Action": [
               "kafka-cluster:*"
           ],
           "Resource": [
               "arn:aws:kafka:<arn>:*/my-cluster/*"
           ]
}

但这也不起作用。根据汇合文档中定义的 RBAC 角色绑定,我认为我在集群中设置了正确的权限,但事实似乎并非如此。

是否有人对潜在的 IAM 政策声明或我可能缺少的其他配置有任何见解,以使 EOS 适用于我的 kafka 流应用程序?

amazon-web-services apache-kafka spring-kafka apache-kafka-streams aws-msk
1个回答
0
投票

你能解决这个问题吗?我现在遇到了同样的问题。

© www.soinside.com 2019 - 2024. All rights reserved.