我想做的是-
For Clients to Broker communication - use OAUTHBEARER authentication
For Broker to Broker communication - use PLAIN authentication
我有以下 JAAS 配置:
{
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="inter"
password="inter-secret"
user_inter="inter-secret"
user_admin="YvNzcbmqhA0DfxjP";
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
};
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="zookeeper"
password="zookeeper-secret";
};
}
我在 server.properties 中有以下配置:
sasl.enabled.mechanisms=PLAIN,OAUTHBEARER
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.server.callback.handler.class=br.com.jairsjunior.security.oauthbearer.OauthAuthenticateValidatorCallbackHandler
但是如果启动 kafka 服务,我会看到如下错误:
used by: java.lang.IllegalArgumentException: Must supply exactly 1 non-null JAAS mechanism configuration (size was 2)
at org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredValidatorCallbackHandler.configure(OAuthBearerUnsecuredValidatorCallbackHandler.java:114)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:122)
... 17 more
这表明kafka不允许指定多个JAAS机制配置。
那么我如何指定多个 JAAS 配置,并设置如下所示的身份验证机制:
CLient to Broker ----> OAUTHBEARER
Broker to Broker ----> PLAIN
谢谢!
我目前也在解决同时使用 plain 和 oauthbearer 的问题,我还没有解决,但我通过以下方式解决了您的具体问题。 这是我的 Jaas 配置:
internal.KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_test="test";
};
external.KafkaServer {
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
};
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="username"
password="pw";
};
然后我按以下方式在 server.properties 中设置设置:
inter.broker.listener.name: INTERNAL
sasl.mechanism.inter.broker.protocol: PLAIN
listener.security.protocol.map: INTERNAL:SASL_PLAINTEXT,EXTERNAL:SASL_SSL
listeners: "INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:19092"
sasl.enabled.mechanisms: PLAIN,OAUTHBEARER
listener.name.external.oauthbearer.sasl.server.callback.handler.class: my.module.kafka.security.oauthbearer.OauthAuthenticateValidatorCallbackHandler
listener.name.external.oauthbearer.sasl.login.callback.handler.class: my.module.kafka.security.oauthbearer.OauthAuthenticateLoginCallbackHandler
当你这样做时,你就不会得到你的错误。遗憾的是,当代理想要设置外部连接时,我收到另一个错误:
javax.security.auth.callback.UnsupportedCallbackException: Unrecognized SASL Login callback
at org.apache.kafka.common.security.authenticator.AbstractLogin$DefaultLoginCallbackHandler.handle(AbstractLogin.java:105)
at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:316)
at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301)
... 32 more
kafka 代理似乎忽略了 oauthbearer 回调处理程序。这有点奇怪,因为当我将其配置为唯一的侦听器时,外部工作正常。
希望它能帮助您解决问题!
您可以像这样配置两种机制(如果您的oauth使用AAD),它在我的代理中运行良好:
经纪商配置:
listeners=SASL://0.0.0.0:9770,SASL_CONTROLLER://0.0.0.0:9773,SASL_INTERNAL://0.0.0.0:9774
advertised.listeners=SASL://localhost:9770,SASL_CONTROLLER://localhost:9773,SASL_INTERNAL://localhost:9774
listener.security.protocol.map=SASL:SASL_PLAINTEXT,SASL_CONTROLLER:SASL_PLAINTEXT,SASL_INTERNAL:SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=OAUTHBEARER,PLAIN
control.plane.listener.name = SASL_CONTROLLER
inter.broker.listener.name = SASL_INTERNAL
listener.name.sasl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="AdminName" password="AdminPassword" user_UserName="UserPassword";
listener.name.sasl.plain.sasl.server.callback.handler.class=com.XXXAuthenticateCallbackHandler
listener.name.sasl_controller.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="AdminName" password="AdminPassword" user_UserName="UserPassword";
listener.name.sasl_controller.plain.sasl.server.callback.handler.class=com.XXXAuthenticateCallbackHandler
listener.name.sasl_internal.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="AdminName" password="AdminPassword" user_UserName="UserPassword";
listener.name.sasl_internal.plain.sasl.server.callback.handler.class=com.XXXAuthenticateCallbackHandler
listener.name.sasl.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId='BrokerApplicationId' clientSecret='BrokerApplicationSecret' scope='BrokerApplicationId/.default';
listener.name.sasl.oauthbearer.sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
listener.name.sasl.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler
listener.name.sasl_controller.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId='BrokerApplicationId' clientSecret='BrokerApplicationSecret' scope='BrokerApplicationId/.default';
listener.name.sasl_controller.oauthbearer.sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
listener.name.sasl_controller.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler
listener.name.sasl_internal.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId='BrokerApplicationId' clientSecret='BrokerApplicationSecret' scope='BrokerApplicationId/.default';
listener.name.sasl_internal.oauthbearer.sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
listener.name.sasl_internal.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler
sasl.oauthbearer.jwks.endpoint.url=https://login.microsoftonline.com/BrokerTenantId/discovery/v2.0/keys
sasl.oauthbearer.token.endpoint.url=https://login.microsoftonline.com/BrokerTenantId/oauth2/v2.0/token
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
allow.everyone.if.no.acl.found=true
super.users=User:AdminName
客户端配置:
普通:
bootstrap.servers=xxxx:9770
compression.type=none
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="UserName" password="UserPassword";
授权:
bootstrap.servers=xxxx:9770
compression.type=none
sasl.oauthbearer.token.endpoint.url=https://login.microsoftonline.com/ClientTenantId/oauth2/v2.0/token
sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
sasl.mechanism=OAUTHBEARER
security.protocol=SASL_PLAINTEXT
sasl.jaas.config= \
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
clientId='ClientApplicationId' \
scope='BrokerApplicationId/.default' \
clientSecret='ClientApplicationSecret';