使用Spark Testing Base库创建Spark DataFrame的最佳方法是什么?

问题描述 投票:0回答:1

我正在为一个Spark方法编写单元测试,该方法将多个数据帧作为输入参数并返回一个数据帧。 spark方法的代码如下所示:

class processor {
    def process(df1: DataFrame, df2: DataFrame): DataFrame = {
      // process and return resulting data frame
    }
}

相应单元测试的现有代码如下:

import com.holdenkarau.spark.testing.DataFrameSuiteBase
import org.apache.spark.sql.DataFrame
import org.scalatest.{FlatSpec, Matchers}

class TestProcess extends FlatSpec with DataFrameSuiteBase with Matchers {

  val p:Processor = new Processor

  "process()" should "return only one row" in {
    df1RDD = sc.parallelize(
      Seq("a", 12, 98999),
      Seq("b", 42, 99)
    )
   df1DF = spark.createDataFrame(df1RDD).toDF()

    df2RDD = sc.parallelize(
      Seq("X", 12, "foo", "spark"),
      Seq("Z", 42, "bar", "storm")
    )
   df2DF = spark.createDataFrame(df2RDD).toDF()

  val result = p.process(df1, df2)
  }

  it should "return spark row" in {
    df1RDD = sc.parallelize(
      Seq("a", 12, 98999),
      Seq("b", 42, 99)
    )
   df1DF = spark.createDataFrame(df1RDD).toDF()

    df2RDD = sc.parallelize(
      Seq("X", 12, "foo", "spark"),
      Seq("Z", 42, "bar", "storm")
    )
   df2DF = spark.createDataFrame(df2RDD).toDF()

  val result = p.process(df1, df2)
  }
}

这段代码工作正常,但是在每个测试方法中都存在创建RDD和DF的代码重复的问题。当我尝试在测试方法之外或在BeforeAndAfterAll()方法内创建RDD时,我得到关于sc不可用的错误。似乎Spark Testing Base库仅在测试方法内启动scspark变量。

我想知道是否有任何方法可以避免编写此重复代码?


使用WordSpec而不是使用FlatSpec后更新了代码

import com.holdenkarau.spark.testing.DataFrameSuiteBase
import org.apache.spark.sql.DataFrame
import org.scalamock.scalatest.MockFactory
import org.scalatest.{Matchers, WordSpec}

class TestProcess extends WordSpec with DataFrameSuiteBase with Matchers {

  val p:Processor = new Processor

  "process()" should {
    df1RDD = sc.parallelize(
        Seq("a", 12, 98999),
        Seq("b", 42, 99)
      )
    df1DF = spark.createDataFrame(df1RDD).toDF()

    df2RDD = sc.parallelize(
        Seq("X", 12, "foo", "spark"),
        Seq("Z", 42, "bar", "storm")
    )
    df2DF = spark.createDataFrame(df2RDD).toDF()
    val result = p.process(df1, df2)

    "return only one row" in {             
      result.count should equal(1)
    }

    "return spark row" in {
      // assertions to check if 'row' containing 'spark' in last column is in the result or not
    }
  }
}
scala unit-testing apache-spark spark-dataframe
1个回答
3
投票

使用WordSpec而不是FlatSpec,因为它允许在测试子句之前对常见初始化进行分组,如

"process()" should {
     df1RDD = sc.parallelize(Seq("a", 12, 98999),Seq("b", 42, 99))
     df1DF = spark.createDataFrame(df1RDD).toDF()
     df2RDD = sc.parallelize(Seq("X", 12, "foo", "spark"), Seq("Z", 42, "bar", "storm"))
     df2DF = spark.createDataFrame(df2RDD).toDF()
     "return only one row" in {
         ....
     }
     "return spark row" in {
         ....
     }
}

编辑:此外,以下两行代码几乎没有理由使用库(spark-testing-base):

val spark = SparkSession.builder.master("local[1]").getOrCreate
val sc = spark.sparkContext

将这些添加到您的类的顶部,并且您使用SparkContext和所有设置,并且没有NPE。

编辑:我刚刚用我自己的测试证实,火花测试基础与WordSpec不兼容。如果您仍想使用它,请考虑使用库作者打开错误报告,因为这肯定是spark-testing-base的一个问题。

© www.soinside.com 2019 - 2024. All rights reserved.