我正在使用本指南测试 Azure Event 和 IoT 中心:https://www.databricks.com/notebooks/iiot/iiot-end-to-end-part-1.html
我使用事件中心兼容连接字符串创建了一个秘密,如下所示:
Endpoint=sb://iothub-ns-xxx-xxx-xxx.servicebus.windows.net/;SharedAccessKeyName=iothubowner;SharedAccessKey=XXXX=;EntityPath=xxxx-iothub
我已验证在事件中心命名空间上启用了本地身份验证:
当我尝试从 IoT 中心读取数据时,仍然出现此错误:
java.util.concurrent.ExecutionException: com.microsoft.azure.eventhubs.AuthorizationFailedException: status-code: 401, status-description: LocalAuthDisabled: Authorization failed because SAS authentication has been disabled for the namespace.
这是失败的代码:
# Schema of incoming data from IoT hub
schema = "timestamp timestamp, deviceId string, temperature double, humidity double, windspeed double, winddirection string, rpm double, angle double"
# Read directly from IoT Hub using the EventHubs library for Databricks
iot_stream = (
spark.readStream.format("eventhubs") # Read from IoT Hubs directly
.options(**ehConf) # Use the Event-Hub-enabled connect string
.load() # Load the data
.withColumn('reading', F.from_json(F.col('body').cast('string'), schema)) # Extract the "body" payload from the messages
.select('reading.*', F.to_date('reading.timestamp').alias('date')) # Create a "date" field for partitioning
)
# Split our IoT Hub stream into separate streams and write them both into their own Delta locations
write_turbine_to_delta = (
iot_stream.filter('temperature is null') # Filter out turbine telemetry from other data streams
.select('date','timestamp','deviceId','rpm','angle') # Extract the fields of interest
.writeStream.format('delta') # Write our stream to the Delta format
.partitionBy('date') # Partition our data by Date for performance
.option("checkpointLocation", CHECKPOINT_PATH + "turbine_raw") # Checkpoint so we can restart streams gracefully
.start(BRONZE_PATH + "turbine_raw") # Stream the data into an ADLS Path
)
write_weather_to_delta = (
iot_stream.filter(iot_stream.temperature.isNotNull()) # Filter out weather telemetry only
.select('date','deviceid','timestamp','temperature','humidity','windspeed','winddirection')
.writeStream.format('delta') # Write our stream to the Delta format
.partitionBy('date') # Partition our data by Date for performance
.option("checkpointLocation", CHECKPOINT_PATH + "weather_raw") # Checkpoint so we can restart streams gracefully
.start(BRONZE_PATH + "weather_raw") # Stream the data into an ADLS Path
)
# Create the external tables once data starts to stream in
while True:
try:
spark.sql(f'CREATE TABLE IF NOT EXISTS turbine_raw USING DELTA LOCATION "{BRONZE_PATH + "turbine_raw"}"')
spark.sql(f'CREATE TABLE IF NOT EXISTS weather_raw USING DELTA LOCATION "{BRONZE_PATH + "weather_raw"}"')
break
except:
pass
我发现了问题。在我的 terraform 中创建
azurerm_iothub
已设置
local_authentication_enabled = false
将其更改为true
后就可以了。还是很奇怪,它看起来像是在门户中启用的,但可能只是我误读了门户。