Kafka OAuth:无法识别的 SASL 登录回调

问题描述 投票:0回答:1

我想做的是:

Setup SASL/PLAIN for inter-broker communication

Setup SASL/PLAIN for Broker-Zookeeper communication

And Setup SASL/OAUTHBEARER for client-broker communication.

我的server_jaas.conf是:

KafkaServer { 
    org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
    LoginStringClaim_sub="admin";

    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-secret"
    user_admin="admin-secret";
};

Client {
    org.apache.zookeeper.server.auth.DigestLoginModule required
    username="zookeeper"
    password="zookeeper_secret";
};

我还设置了以下配置选项:

sasl.enabled.mechanisms=PLAIN,OAUTHBEARER
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.server.callback.handler.class=br.com.jairsjunior.security.oauthbearer.OauthAuthenticateValidatorCallbackHandler

我已经使用了这里提供的jar - https://github.com/jairsjunior/kafka-playground/tree/master/kafka-broker/kafka-with-oauth/libs

但是当我启动 Kafka 服务器时,我收到错误:

Unrecognized SASL Login callback
(org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule)
javax.security.auth.callback.UnsupportedCallbackException: Unrecognized SASL Login callback

at org.apache.kafka.common.security.authenticator.AbstractLogin$DefaultLoginCallbackHandler.handle(AbstractLogin.java:105)
at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:261)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at javax.security.auth.login.LoginContext.invoke(LoginContext.java:755)
at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680)
at javax.security.auth.login.LoginContext.login(LoginContext.java:587)
at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:60)
at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:64)
at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:114)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:142)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:140)
at org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:82)
at kafka.network.Processor.<init>(SocketServer.scala:548)
at kafka.network.SocketServer.newProcessor(SocketServer.scala:247)
at kafka.network.SocketServer.$anonfun$addProcessors$1(SocketServer.scala:163)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:156)
at kafka.network.SocketServer.addProcessors(SocketServer.scala:162)
at kafka.network.SocketServer.$anonfun$createAcceptorAndProcessors$1(SocketServer.scala:150)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:52)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at kafka.network.SocketServer.createAcceptorAndProcessors(SocketServer.scala:145)
at kafka.network.SocketServer.startup(SocketServer.scala:94)
at kafka.server.KafkaServer.startup(KafkaServer.scala:250)
at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38)
at kafka.Kafka$.main(Kafka.scala:75)
at kafka.Kafka.main(Kafka.scala)



[2019-05-24 12:30:30,414] ERROR [KafkaServer id=240] Fatal error during   
KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
org.apache.kafka.common.KafkaException:    
javax.security.auth.login.LoginException: An internal error occurred
at
org.apache.kafka.common.network.SaslChannelBuilder.
configure(SaslChannelBuilder.java:153)
at org.apache.kafka.common.network.ChannelBuilders.create
(ChannelBuilders.java:140)
at org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder
(ChannelBuilders.java:82)
at kafka.network.Processor.<init>(SocketServer.scala:548)
at kafka.network.SocketServer.newProcessor(SocketServer.scala:247)
at kafka.network.SocketServer.$anonfun$addProcessors$1
(SocketServer.scala:163)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:156)
at kafka.network.SocketServer.addProcessors(SocketServer.scala:162)
at kafka.network.SocketServer.$anonfun$createAcceptorAndProcessors$1
(SocketServer.scala:150)
at scala.collection.mutable.ResizableArray.foreach
(ResizableArray.scala:59)
at scala.collection.mutable.ResizableArray.foreach$
(ResizableArray.scala:52)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at kafka.network.SocketServer.createAcceptorAndProcessors
(SocketServer.scala:145)
at kafka.network.SocketServer.startup(SocketServer.scala:94)
at kafka.server.KafkaServer.startup(KafkaServer.scala:250)
at kafka.server.KafkaServerStartable.startup
(KafkaServerStartable.scala:38)
at kafka.Kafka$.main(Kafka.scala:75)
at kafka.Kafka.main(Kafka.scala)



Caused by: javax.security.auth.login.LoginException: An internal error occurred
at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.
login(OAuthBearerLoginModule.java:264)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke
(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke
(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at javax.security.auth.login.LoginContext.invoke(LoginContext.java:755)
at javax.security.auth.login.LoginContext.access$000
(LoginContext.java:195)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.login.LoginContext.invokePriv
(LoginContext.java:680)
at javax.security.auth.login.LoginContext.login(LoginContext.java:587)
at org.apache.kafka.common.security.authenticator.AbstractLogin.
login(AbstractLogin.java:60)
at org.apache.kafka.common.security.authenticator.LoginManager.
<init>(LoginManager.java:64)
at org.apache.kafka.common.security.authenticator.LoginManager.
acquireLoginManager(LoginManager.java:114)
at org.apache.kafka.common.network.SaslChannelBuilder.configure
(SaslChannelBuilder.java:142)
... 17 more

我在这里缺少什么?任何帮助将不胜感激。

谢谢!

oauth-2.0 apache-kafka apache-zookeeper sasl
1个回答
0
投票

sasl.login.callback.handler.class 用于指定客户端基于哪个登录模块请求令牌,需要在broker和客户端中进行配置。 (不知道为什么broker也需要配置,但是我测试的时候,broker也必须配置,不然会报错)

sasl.server.callback.handler.class是用于指定服务器端对客户端发送的token进行验证的类,需要在broker端进行配置。

需要配置两个handler类。例如:

listener.name.sasl.oauthbearer.sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
listener.name.sasl.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler

整个配置例如:(使用两种机制:OAuth 和 Plain,OAuth 使用 AAD 令牌)

经纪商配置:

listeners=SASL://x.x.x.x:9770
advertised.listeners=SASL://x.x.x.x:9770
listener.security.protocol.map=SASL:SASL_PLAINTEXT

sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=OAUTHBEARER,PLAIN

listener.name.sasl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="AdminName" password="AdminPassword" user_UserName="UserPassword";
listener.name.sasl.plain.sasl.server.callback.handler.class=com.XXXAuthenticateCallbackHandler

listener.name.sasl.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId='BrokerApplicationId' clientSecret='BrokerApplicationSecret' scope='BrokerApplicationId/.default';
listener.name.sasl.oauthbearer.sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
listener.name.sasl.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler

sasl.oauthbearer.jwks.endpoint.url=https://login.microsoftonline.com/BrokerTenantId/discovery/v2.0/keys
sasl.oauthbearer.token.endpoint.url=https://login.microsoftonline.com/BrokerTenantId/oauth2/v2.0/token

authorizer.class.name=kafka.security.authorizer.AclAuthorizer
allow.everyone.if.no.acl.found=true
super.users=User:AdminName

客户端配置:

普通:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="UserName" password="UserPassword";

授权:

bootstrap.servers=xxxx:xxxx
compression.type=none
sasl.oauthbearer.token.endpoint.url=https://login.microsoftonline.com/ClientTenantId/oauth2/v2.0/token
sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
sasl.mechanism=OAUTHBEARER
security.protocol=SASL_PLAINTEXT
sasl.jaas.config= \
  org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
    clientId='ClientApplicationId' \
    scope='BrokerApplicationId/.default' \
    clientSecret='ClientApplicationSecret';
© www.soinside.com 2019 - 2024. All rights reserved.