我是这方面的新工作,我正在构建一个 Spark 结构化流作业,我试图使用 Kafka 来消费 EventHub 事件,我遵循了一个官方文档示例,其中使用服务主体秘密访问 EventHub。当我在 Synapse Notebook 中运行此代码时,出现错误,未找到类“org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler”。请让我知道我可能做错了什么或者我错过了一些东西。
sasl_config = f'org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'
kafka_options = {
# Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers": f"{event_hubs_server}:9093",
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"startingOffsets":"earliest",
"kafka.sasl.login.callback.handler.class": "org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
# Print the contents of the streaming DataFrame to the console
query = df.writeStream \
.foreach(lambda x: print(x)) \
.outputMode("append") \
.start()
# Wait for the query to terminate
query.awaitTermination()
我只需替换以下类即可使您的代码在 Databricks 14.3 上运行:
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule => kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler => kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler