并行运行不同DStream的多个Spark Streaming作业

问题描述 投票:0回答:1

我有一个 Spark Streaming 应用程序,可以从多个 Kafka 主题读取数据。每个主题都有不同类型的数据,因此需要不同的处理管道。

我最初的解决方案是为每个主题创建一个 DStream:

def main(args: Array[String]) { 
    val streamingContext: StreamingContext = ...
    val topics = ...

    for (topic <- topics) {
        val offsets: Map[TopicAndPartition, Long] = ...
        val stream = KafkaUtils.createDirectStream[...](streamingContext, kafkaProperties, offsets, ...)
        configureStream(topic, stream)
    }

    streamingContext.addStreamingListener(new StreamingListener {
        override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
            // logic to save offsets after each batch completes
        }
    })

    streamingContext.start()
    streamingContext.awaitTermination()
}


def configureStream(topic: String, stream: DStream[...]) {
    topic match {
        case "first" => stream.map(...).foreachRDD(...)
        case "second" => stream.map(...).foreachRDD(...)
        case "third" => stream.map(...).foreachRDD(...)
        // ...
    }
}

运行应用程序时,处理作业会被一个接一个地计算,即使它们最初属于不同的 DStream..

我尝试调整

spark.streaming.concurrentJobs
参数(如此处所述),但这就是事情变得奇怪的时候:

  • 第一批正在处理更多数据(这是因为当流应用程序关闭时数据会累积在 Kafka 中)。处理时间比指定的批次间隔长。
  • 第二批已添加到队列中(第一批仍在运行),并立即开始处理。
  • 第二批(有时甚至是第三批)在第一批之前完成。

这可能会导致问题,例如在管理 Kafka 偏移量时 - 流侦听器首先获取第二/第三批的偏移量(因为它首先完成)并保存它们。如果应用程序在完成第一批之前崩溃,则该数据将丢失。在另一种情况下,如果第一批完成并且应用程序随后崩溃,则将重播第二/第三批中的数据。

有没有办法告诉 Spark 并行处理作业而不处理新批次?或者,也许并行处理不同的 DStream(即,一个 DStream 中的作业是线性处理的;跨不同的 DStream 是并行处理的)?

scala apache-spark spark-streaming
1个回答
0
投票

Spark 结构化流似乎正在解决这个问题。我稍后会分享示例代码。同时可以浏览this答案

© www.soinside.com 2019 - 2024. All rights reserved.