我有以下简单的代码可以从我们的 Azure EventHub 读取,但是我不断收到错误:
SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find data source: eventhubs.
connectionString = "Endpoint=sb://myspace.servicebus.windows.net/;SharedAccessKeyName=myaccesspolicy;SharedAccessKey=lfZTLiDv782xLCxxxxxxxxxxxxxAEhJ3XsFc=;EntityPath=myshub"
ehConf = {
'eventhubs.connectionString' : connectionString
}
df = spark \
.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load()
对于我无法连接到事件中心的原因有什么想法吗?
您需要将 azure-eventhubs-spark 库附加到集群才能使用它。但使用内置的 Kafka 连接器连接到 EventHub 确实更好 - 它更稳定且性能更高。只需使用此博客文章中所述的正确配置即可(EventHubs 的 SAS 来自秘密范围):
secret_scope = "scope"
secret_name = "eventhub_sas"
topic_name = "topic1"
eh_namespace_name = "<eh-ns-name>"
readConnectionString = dbutils.secrets.get(secret_scope, secret_name)
eh_sasl = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule' \
+ f' required username="$ConnectionString" password="{readConnectionString}";'
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
kafka_options = {
"kafka.bootstrap.servers": bootstrap_servers,
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"kafka.request.timeout.ms": "60000",
"kafka.session.timeout.ms": "30000",
"startingOffsets": "earliest",
"kafka.sasl.jaas.config": eh_sasl,
"subscribe": topic_name,
}
df = spark.readStream.format("kafka") \
.options(**kafka_options).load()