[我正在使用Spark结构化流媒体,以使用kafka主题中的数据,并将数据写入另一个kafka接收器。
我想存储两次偏移-从主题读取一次并搅动偏移。其次,在将数据写入输出接收器并写入偏移量时,可以通过指定检查点目录位置来实现,
是否可以写出订阅主题期间消耗的偏移量。
您可以使用StreamingQueryListener。您可以通过
将侦听器添加到流中spark.streams.addListener(new StreamingQueryListener() {
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
// insert code here to log the offsets in addition to Spark's checkpoint
}
override def onQueryProgress(event: QueryProgressEvent): Unit = {}
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {}
})