使用 pytest 对 pyspark 代码运行单元测试。下面给出的代码中的代码片段示例。看起来像预期的 Spark 运行时或 hadoop 运行时库,但我认为单元测试并不真正需要 Spark 库。只需 pyspark python 包就足够了,因为像 Jenkins 这样的工具不会安装 Spark 运行时。请指导
def read_inputfile_from_ADLS(self):
try:
if self.segment == "US":
if self.input_path_2 is None or self.input_path_2 == "":
df = self.spark.read.format("delta").load(self.input_path)
else:
df = self.spark.read.format("delta").load(self.input_path_2)
except Exception as e:
resultmsg = "error reading input file"
Py测试代码
import pytest
from unittest.mock import patch,MagicMock , Mock
class TestInputPreprocessor:
inpprcr = None
dataframe_reader = 'pyspark.sql.readwriter.DataFrameReader'
def test_read_inputfile_from_ADLS(self,spark,tmp_path):
self.segment = 'US'
self.input_path_2 = tmp_path
with patch(f'{self.dataframe_reader}.format', MagicMock(autospec=True)) as
mock_adls_read:
self.inpprcr.read_inputfile_from_ADLS()
assert mock_adls_read.call_count == 1
错误:
AssertionError
---------------------------------------------- Captured stderr setup -------------------
---------------------------
23/07/12 23:58:42 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException:
java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see
https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
setLogLevel(newLevel).
23/07/12 23:58:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your
platform... using builtin-java classes where applicable
修复了这个问题。必须下载 winutils.exe 并映射到 HADOOP_HOME 、 SPARK_HOME 到 python lib 中的 pyspark 位置
'C:\Users
无需在本地笔记本电脑上安装 Hadoop 或 Spark 进行单元测试