使用 Scala 在 Spark 流应用程序中编写优化 UDF 的最佳方式是什么?

问题描述 投票:0回答:1

我正在使用 Spark Streaming 应用程序,我需要使用来自一个 Kafka 主题的数据,并且需要推送到另一个 Kafka 主题。

我创建了一个 UDF 函数,它执行一些内置 Spark SQL/其他函数无法使用的业务逻辑

Object TestingObject Extetnds Serializble{

def userdefined_function(String:row_string):String = {
return "Data After Business Logic"
}

def main(args: Array[String]): Unit = {
kafkaStream.foreachRDD(foreachFunc = rdd => {
      if (!rdd.isEmpty()) {
val df = ss.read.option("mode", "DROPMALFORMED").json(ss.createDataset(newRDD)(Encoders.STRING))
        val Enricheddf = df.toJSON.foreach(row => {
val data = userdefined_function(row);
kafkaproducer.send(topicname,data)
})
}}
}

我知道在spark应用程序中使用UDF非常昂贵。但是在我的业务逻辑中我没有其他方法,所以我应该在我的应用程序中使用。

我的问题是如何优化 Spark Scala 流应用程序中的 My UDF 函数?

我可以在main函数中使用UDF吗?或者 我可以在 foreach 函数(每行)中使用 UDF 吗?或者 我可以将 UDF 放在不同的类中并使用 Spark 广播该类吗?或者 我应该怎么办。 ? 有人可以为此提出建议吗? 预先感谢。

scala apache-spark spark-streaming user-defined-functions
1个回答
0
投票

我会尽力澄清一些要点:

在代码中您应该了解 Spark 的几个主要概念:

  1. 与其他语言一样,主函数是应用程序的入口点,因此您是否可以在主函数中使用 UDF 的问题,是的,您可以在那里使用任何您想要的东西。

  2. UDF的概念应用于Spark SQL世界。这意味着这个概念与 Spark Dataframes 密切相关。

  3. 您正在使用旧的 Spark Streaming 实现。通常,您应该使用 Spark Structured Streaming API。您使用的 Spark Streaming 规范是基于 RDD api 构建的。对于每个小批量,您可以将传入消息作为 RDD 进行操作,这里没有 UDF,您可以将普通的 Scala 函数应用于每个小批量。

  4. 不要为每个小批量创建一个新的数据帧。你不需要这样做。您的数据已经分布在执行者之间。例如,您可以使用 RDD 的映射函数在 foreachRDD 中使用纯 Scala 代码来应用您想要的任何内容。想象一下,如果您有数千个小批量......

  5. 与 UDF 相关。它们非常有用,您必须考虑到它们是 Spark 优化器的黑匣子,因为您可以在其中使用您想要的任何内容,并且 Spark 将无法检查您的代码以按照它认为的方式创建执行计划将是最有效的方法。

  6. 当您使用 UDF 时,Spark 必须将数据表示从 Spark 序列化/反序列化为 Scala 类型,反之亦然,因此存在额外成本,但这并不意味着您必须避免它们,有时它们非常有用。除了额外的 GC 开销之外。因此,请避免在其中使用重对象,例如使用普通数组,而不是元组或大写类。

© www.soinside.com 2019 - 2024. All rights reserved.