我已经在本地设置中尝试 cogito 无服务器工作流程一段时间了。在我的设置中,我使用 Kafka 定义了涉及 AsyncAPI 调用的状态的工作流程。这在本地运行良好,使用本地 Kafka 设置,无需任何身份验证机制。
当我尝试进一步推广并进一步尝试对远程 Kafka 集群进行相同的操作时,我遇到了一个问题,它似乎按预期进行了授权,但它没有轮询事件来触发工作流程。当我尝试使用 SASL_SSL 身份验证机制的 Kafka 集群时,就会发生这种情况。我尝试使用另一个远程 Kafka 集群,该集群使用基于证书的授权,并且似乎工作正常。我试图了解 Kafka 支持的身份验证机制是否有任何限制。
我得出的结论是,它正在按预期进行授权,因为当我在“kafka.sasl.jaas.config”值中尝试错误的用户名/密码时,它给出了正确的身份验证失败错误,但是当我更正它时,它没有给出任何错误错误并打印日志,如底部所示。但即使日志提到消费者已启动,但没有消费者组在范围内为该主题注册,因此那里出了问题。 Kafka 代理没有问题,因为我尝试连接和消费/生成具有相同属性的消息,并且工作正常。
有人可以看一下并提供您的意见吗?
工作流服务中用于本地 Kafka 连接的属性:
kafka.bootstrap.servers=127.0.0.1:9092
用于远程 Kafka 连接(无法正常工作)的属性:
kafka.bootstrap.servers=<brokers>
kafka.security.protocol=SASL_SSL
kafka.sasl.mechanism=SCRAM-SHA-512
kafka.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";
//Tried with and without the below properties and same result in both cases
kafka.ssl.truststore.location=<truststorepath>
kafka.ssl.truststore.password=<truststorepassword>
日志:
17:03:26 INFO traceId=, parentId=, spanId=, sampled= [io.sm.re.me.kafka] (Quarkus Main Thread) SRMSG18229: Configured topics for channel '<channelName>': [<topicName>]
17:03:27 INFO traceId=, parentId=, spanId=, sampled= [io.sm.re.me.kafka] (smallrye-kafka-consumer-thread-4) SRMSG18257: Kafka consumer kafka-consumer-<channelName/topicName>, connected to Kafka brokers '<brokersList>', belongs to the 'sw-xyz-in8' consumer group and is configured to poll records from [<topicName>]
17:03:27 INFO traceId=, parentId=, spanId=, sampled= [or.ki.ko.ev.im.AbstractMessageConsumer] (Quarkus Main Thread) Consumer for <channelName/topicName> started