我有一个 Spark Streaming 应用程序,可以从多个 Kafka 主题读取数据。每个主题都有不同类型的数据,因此需要不同的处理管道。
我最初的解决方案是为每个主题创建一个 DStream:
def main(args: Array[String]) {
val streamingContext: StreamingContext = ...
val topics = ...
for (topic <- topics) {
val offsets: Map[TopicAndPartition, Long] = ...
val stream = KafkaUtils.createDirectStream[...](streamingContext, kafkaProperties, offsets, ...)
configureStream(topic, stream)
}
streamingContext.addStreamingListener(new StreamingListener {
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
// logic to save offsets after each batch completes
}
})
streamingContext.start()
streamingContext.awaitTermination()
}
def configureStream(topic: String, stream: DStream[...]) {
topic match {
case "first" => stream.map(...).foreachRDD(...)
case "second" => stream.map(...).foreachRDD(...)
case "third" => stream.map(...).foreachRDD(...)
// ...
}
}
运行应用程序时,处理作业会被一个接一个地计算,即使它们最初属于不同的 DStream..
我尝试调整
spark.streaming.concurrentJobs
参数(如此处所述),但这就是事情变得奇怪的时候:
这可能会导致问题,例如在管理 Kafka 偏移量时 - 流侦听器首先获取第二/第三批的偏移量(因为它首先完成)并保存它们。如果应用程序在完成第一批之前崩溃,则该数据将丢失。在另一种情况下,如果第一批完成并且应用程序随后崩溃,则将重播第二/第三批中的数据。
有没有办法告诉 Spark 并行处理作业而不处理新批次?或者,也许并行处理不同的 DStream(即,一个 DStream 中的作业是线性处理的;跨不同的 DStream 是并行处理的)?
Spark 结构化流似乎正在解决这个问题。我稍后会分享示例代码。同时可以浏览this答案