无法在 Databricks 中查看事件中心生成的数据错误:java.lang.IllegalArgumentException:输入字节数组具有错误的 4 字节结束单位

问题描述 投票:0回答:1

我使用生成数据(预览)在 EventHub 中生成了虚拟黄色出租车数据

当我尝试使用下面的代码查看或(显示)Databricks 中的流数据时,出现错误:

ERROR: Some streams terminated before this command could finish!
java.lang.IllegalArgumentException: Input byte array has wrong 4-byte ending unit

用于读取流的代码是:

connectionString = 'Endpoint=sb://carlsspace.servicebus.windows.net/;SharedAccessKeyName=myaccesspolicy;SharedAccessKey=XXXXXXXXXXXXXXXXXXXXXXXXXXXXXwRM+AEhJ3XsFc=;EntityPath=carlshub'
ehConf = {
  'eventhubs.connectionString' : connectionString
}

df = spark \
  .readStream \
  .format("eventhubs") \
  .options(**ehConf) \
  .load()
df = df.withColumn("body", df["body"].cast("string"))
display(df)

我应该尝试阅读 Databricks 社区版上的流

更新:

我将代码更改如下:

connectionString = 'Endpoint=sb://carlsspace.servicebus.windows.net/;SharedAccessKeyName=myaccesspolicy;SharedAccessKey=lfZTXXXXXXXXXXXXXXXXXXXVaNv9wRM+AEhJ3XsFc=;EntityPath=carlshub'

ehConf = {}

# For versions before 2.3.15, set the connection string without encryption
# ehConf['eventhubs.connectionString'] = connectionString

# For 2.3.15 version and above, the configuration dictionary requires that connection string be encrypted.
ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)

df = spark \
  .readStream \
  .format("eventhubs") \
  .options(**ehConf) \
  .load()
df = df.withColumn("body", df["body"].cast("string"))
display(df)

但我现在收到以下消息:

pyspark azure-databricks azure-eventhub
1个回答
0
投票
ERROR: Some streams terminated before this command could finish!
java.lang.IllegalArgumentException: Input byte array has wrong 4-byte ending unit

关于您遇到的错误,您需要加密配置字典中的连接字符串

ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)

您需要在集群上安装 MAVEN

com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.22 

enter image description here

确保您的 EventHub 中有活动的事件消息 enter image description here

您可以尝试以下方法:

namespaceName = "dilieepevnt"
EVENTHUB_NAME = "dileepeventhubnamespc" 
SharedAccessKeyName = "RootManageSharedAccessKey"
SharedAccessKey = "SimgL2vbFb6uMSdDpeSPHzvIf0sZcP3FR+AEhGa8JN0="
connectionString = f"Endpoint=sb://{namespaceName}.servicebus.windows.net/;SharedAccessKeyName={SharedAccessKeyName};SharedAccessKey={SharedAccessKey};EntityPath={EVENTHUB_NAME}"
ehConf = {}
ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)
df = spark \
  .readStream \
  .format("eventhubs") \
  .options(**ehConf) \
  .load()
readInStreamBody = df.withColumn("body", df["body"].cast("string"))
display(readInStreamBody)

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.