Azure 事件中心从 DataBricks 读取失败并显示 401 授权失败,因为已为命名空间禁用 SAS 身份验证

问题描述 投票:0回答:1

我正在使用本指南测试 Azure Event 和 IoT 中心:https://www.databricks.com/notebooks/iiot/iiot-end-to-end-part-1.html

我使用事件中心兼容连接字符串创建了一个秘密,如下所示:

Endpoint=sb://iothub-ns-xxx-xxx-xxx.servicebus.windows.net/;SharedAccessKeyName=iothubowner;SharedAccessKey=XXXX=;EntityPath=xxxx-iothub

我已验证在事件中心命名空间上启用了本地身份验证:

当我尝试从 IoT 中心读取数据时,仍然出现此错误:

java.util.concurrent.ExecutionException: com.microsoft.azure.eventhubs.AuthorizationFailedException: status-code: 401, status-description: LocalAuthDisabled: Authorization failed because SAS authentication has been disabled for the namespace.

这是失败的代码:

    # Schema of incoming data from IoT hub
    schema = "timestamp timestamp, deviceId string, temperature double, humidity double, windspeed double, winddirection string, rpm double, angle double"
    
    # Read directly from IoT Hub using the EventHubs library for Databricks
    iot_stream = (
      spark.readStream.format("eventhubs")                                               # Read from IoT Hubs directly
        .options(**ehConf)                                                               # Use the Event-Hub-enabled connect string
        .load()                                                                          # Load the data
        .withColumn('reading', F.from_json(F.col('body').cast('string'), schema))        # Extract the "body" payload from the messages
        .select('reading.*', F.to_date('reading.timestamp').alias('date'))               # Create a "date" field for partitioning
    )
    
    # Split our IoT Hub stream into separate streams and write them both into their own Delta locations
    write_turbine_to_delta = (
      iot_stream.filter('temperature is null')                                           # Filter out turbine telemetry from other data streams
        .select('date','timestamp','deviceId','rpm','angle')                             # Extract the fields of interest
        .writeStream.format('delta')                                                     # Write our stream to the Delta format
        .partitionBy('date')                                                             # Partition our data by Date for performance
        .option("checkpointLocation", CHECKPOINT_PATH + "turbine_raw")                   # Checkpoint so we can restart streams gracefully
        .start(BRONZE_PATH + "turbine_raw")                                              # Stream the data into an ADLS Path
    )
    
    write_weather_to_delta = (
      iot_stream.filter(iot_stream.temperature.isNotNull())                              # Filter out weather telemetry only
        .select('date','deviceid','timestamp','temperature','humidity','windspeed','winddirection') 
        .writeStream.format('delta')                                                     # Write our stream to the Delta format
        .partitionBy('date')                                                             # Partition our data by Date for performance
        .option("checkpointLocation", CHECKPOINT_PATH + "weather_raw")                   # Checkpoint so we can restart streams gracefully
        .start(BRONZE_PATH + "weather_raw")                                              # Stream the data into an ADLS Path
    )
    
    # Create the external tables once data starts to stream in
    while True:
      try:
        spark.sql(f'CREATE TABLE IF NOT EXISTS turbine_raw USING DELTA LOCATION "{BRONZE_PATH + "turbine_raw"}"')
        spark.sql(f'CREATE TABLE IF NOT EXISTS weather_raw USING DELTA LOCATION "{BRONZE_PATH + "weather_raw"}"')
        break
      except:
        pass
azure authentication azure-iot-hub azure-eventhub
1个回答
0
投票

我发现了问题。在我的 terraform 中创建

azurerm_iothub
已设置
local_authentication_enabled = false
将其更改为
true
后就可以了。还是很奇怪,它看起来像是在门户中启用的,但可能只是我误读了门户。

© www.soinside.com 2019 - 2024. All rights reserved.