使用 Kafka 驱动程序从 Azure 事件中心读取似乎没有获得任何数据

问题描述 投票:0回答:2

我在 Azure Databricks python 笔记本中运行以下代码:

TOPIC = "myeventhub"
BOOTSTRAP_SERVERS = "myeventhubns.servicebus.windows.net:9093"
EH_SASL = "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://myeventhubns.servicebus.windows.net/;SharedAccessKeyName=MyKeyName;SharedAccessKey=myaccesskey;\";"

df = spark.readStream \
    .format("kafka") \
    .option("subscribe", TOPIC) \
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.jaas.config", EH_SASL) \
    .option("kafka.request.timeout.ms", "60000") \
    .option("kafka.session.timeout.ms", "60000") \
    .option("failOnDataLoss", "false") \
    .option("startingOffsets", "earliest") \
    .load()

df_write = df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start() \
    .awaitTermination()

这表明笔记本中没有输出。我该如何调试问题所在?

apache-spark apache-kafka azure-databricks spark-structured-streaming azure-eventhub
2个回答
1
投票

如果你使用

.format("console")
那么输出将不会在笔记本中,它将在驱动程序和执行程序日志中 - 这是 Spark 和 Databricks 之间的区别。

如果你想看数据,只需使用

display
功能:

display(df)

0
投票

这段代码现在正在以非常低的延迟写入数据。当我在 sql 仓库中进行选择时,最新的数据点大约有 10 秒。问题仍然是 foreachBatch 没有运行,但它在工作。

TOPIC = "myeventhub"
BOOTSTRAP_SERVERS = "myeventhub.servicebus.windows.net:9093"
EH_SASL = "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://myeventhub.servicebus.windows.net/;SharedAccessKeyName=mykeyname;SharedAccessKey=mykey;EntityPath=myentitypath;\";"

df = spark.readStream \
    .format("kafka") \
    .option("subscribe", TOPIC) \
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.jaas.config", EH_SASL) \
    .option("kafka.request.timeout.ms", "60000") \
    .option("kafka.session.timeout.ms", "60000") \
    .option("failOnDataLoss", "false") \
    .option("startingOffsets", "earliest") \
    .load()

n = 100
count = 0

def run_command(batchDF, epoch_id):
    global count
    count += 1
    if count % n == 0:
        spark.sql("OPTIMIZE firstcatalog.bronze.factorydatas3 ZORDER BY (readtimestamp)")

...Omitted code where I transform the data in the value column to strongly typed data...

myTypedDF.writeStream \
    .foreachBatch(run_command) \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/delta/events/_checkpoints/") \
    .partitionBy("somecolumn") \
    .toTable("myunitycatalog.bronze.mytable")
© www.soinside.com 2019 - 2024. All rights reserved.