我能够开发一个管道,从 kafka 读取数据并进行一些转换,并将输出写入 kafka 接收器以及 parque 接收器。我想添加有效的日志记录来记录转换的中间结果,就像在常规流应用程序中一样。
我看到的一个选项是通过
记录
queryExecutionstreams
df.queryExecution.analyzed.numberedTreeString
或
logger.info("Query progress"+ query.lastProgress)
logger.info("Query status"+ query.status)
但这似乎没有办法查看流正在其上运行的业务特定消息。
有没有办法添加更多日志信息,例如正在处理的数据?
我找到了一些跟踪相同内容的选项。基本上我们可以使用
df.writeStream.format("parquet") .queryName("table1")
命名我们的流查询。
查询名称
table1
将打印在Spark作业选项卡中,与Spark UI中的已完成作业列表相对应,您可以从中跟踪每个流查询的状态
ProgressReporter
API 来收集更多统计数据。