我在 Azure Databricks python 笔记本中运行以下代码:
TOPIC = "myeventhub"
BOOTSTRAP_SERVERS = "myeventhubns.servicebus.windows.net:9093"
EH_SASL = "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://myeventhubns.servicebus.windows.net/;SharedAccessKeyName=MyKeyName;SharedAccessKey=myaccesskey;\";"
df = spark.readStream \
.format("kafka") \
.option("subscribe", TOPIC) \
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.jaas.config", EH_SASL) \
.option("kafka.request.timeout.ms", "60000") \
.option("kafka.session.timeout.ms", "60000") \
.option("failOnDataLoss", "false") \
.option("startingOffsets", "earliest") \
.load()
df_write = df.writeStream \
.outputMode("append") \
.format("console") \
.start() \
.awaitTermination()
这表明笔记本中没有输出。我该如何调试问题所在?
如果你使用
.format("console")
那么输出将不会在笔记本中,它将在驱动程序和执行程序日志中 - 这是 Spark 和 Databricks 之间的区别。
如果你想看数据,只需使用
display
功能:
display(df)
这段代码现在正在以非常低的延迟写入数据。当我在 sql 仓库中进行选择时,最新的数据点大约有 10 秒。问题仍然是 foreachBatch 没有运行,但它在工作。
TOPIC = "myeventhub"
BOOTSTRAP_SERVERS = "myeventhub.servicebus.windows.net:9093"
EH_SASL = "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://myeventhub.servicebus.windows.net/;SharedAccessKeyName=mykeyname;SharedAccessKey=mykey;EntityPath=myentitypath;\";"
df = spark.readStream \
.format("kafka") \
.option("subscribe", TOPIC) \
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.jaas.config", EH_SASL) \
.option("kafka.request.timeout.ms", "60000") \
.option("kafka.session.timeout.ms", "60000") \
.option("failOnDataLoss", "false") \
.option("startingOffsets", "earliest") \
.load()
n = 100
count = 0
def run_command(batchDF, epoch_id):
global count
count += 1
if count % n == 0:
spark.sql("OPTIMIZE firstcatalog.bronze.factorydatas3 ZORDER BY (readtimestamp)")
...Omitted code where I transform the data in the value column to strongly typed data...
myTypedDF.writeStream \
.foreachBatch(run_command) \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/") \
.partitionBy("somecolumn") \
.toTable("myunitycatalog.bronze.mytable")