我在 PySpark 中的
src/my_funcs/
下有以下 UDF
@F.udf(returnType=T.ArrayType(T.ArrayType(T.StringType())))
def get_details(details_url: str) -> list[list[str]]:
details = requests.get(details_url)
details = details.json()
details = # Some logic
return details
现在我想对这个 UDF 进行单元测试。为此,我做了以下事情:
from unittest import mock
@mock.patch("src.my_funcs.requests.get")
def test_get_details(mock_requests_get, spark):
request_json_response = # Expected result of .json()
mock_requests_get.return_value = mock.Mock(**{"status_code":200, "json.return_value":request_json_response})
input_data = [(1, "random_invalid_url")]
input_schema = ["ID", "details_url"]
input_df = spark.createDataFrame(input_data, input_schema)
expected_data = # expected array
expected_schema = ["ID", "details_url", "details"]
expected_df = spark.createDataFrame(expected_data, expected_schema)
transformed_df = input_df.withColumn("details", get_details("details_url")
assert sorted(expected_df.collect()) == sorted(transformed_df.collect())
我不断收到错误消息
requests.exceptions.MissingSchema: Invalid URL 'random_invalid_url': No scheme supplied. Perhaps you meant https://random_invalid_url?
这个错误让我的模拟看起来不起作用。
如果我从 get_details 函数中删除 UDF 装饰器,并在没有 Spark 的情况下测试该函数,则模拟会起作用并且单元测试会成功。所以我认为该错误与 Spark 的工作方式有关。如何修复此问题并在 Spark DataFrame 上测试 UDF?错误是因为 Spark 执行 UDF 的方式导致的吗?我需要以不同的方式模拟它,还是不可能?
不确定是否相关,但 Spark 会话被创建为像这样的 pytest 固定装置
spark = SparkSession.builder.master("local[*]").appName("UnitTest").getOrCreate()
如果您想在
get_details
中对逻辑进行单元测试,我建议您将该代码提取到一个单独的函数中并进行测试(不需要 Spark 或模拟)。
如果您不想重构代码,可以保留
get_details
不变,但不要使用 @F.udf
装饰器,而是在将其传递给 Spark 方法之前将其作为函数调用(您将需要模拟请求,但仍会避免 Spark ):
udf_get_details = F.udf(get_details, returnType=T.ArrayType(T.ArrayType(T.StringType())))
transformed_df = input_df.withColumn("details", udf_get_details("details_url"))
即使您确实设法通过 Spark 运行它,您也只会: