Attempted to update an empty pipeline.
This error usually means that no tables were discovered in your specified Notebook libraries.
Please verify that your Notebook libraries include table definitions.
DLT笔记本代码
%python
import dlt
@dlt.table
def kafka_raw():
TOPIC = "<event_hub>"
BOOTSTRAP_SERVERS = "<event_hub>.servicebus.windows.net:9093"
EH_SASL = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://<event_hub>.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=<event_hub shared_access_policy primary_key>;";'
return (
spark.readStream.format("kafka")
.option("subscribe", TOPIC)
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", EH_SASL)
.option("kafka.request.timeout.ms", "60000")
.option("kafka.session.timeout.ms", "60000")
.option("failOnDataLoss", "false")
.option("startingOffsets", "earliest")
.load()
)
正如@Alex Ott所说,检查源代码路径,是否使用具有
dlt
定义的笔记本。
下面是普通的笔记本。
然后,我已将此笔记本源代码路径提供给 dlt 管道。
运行后我遇到了和你一样的错误。 因此,在 Source code 选项中给出具有
@dlt.table
装饰器的正确笔记本路径。
我换了原来的笔记本。
然后得到了预期的输出。
输出: