我正在学习 Spark,因此作为一项任务,我们必须在本地创建一个轮子,然后将其安装在 Databricks 中(我使用的是 Azure Databricks),并测试它从 Databrick Notebook 运行它。该程序涉及读取wheel文件中包含的csv文件(timezones.csv)。该文件位于轮子内部(我检查过),而且当我安装轮子并从本地 PC Jupyter 笔记本运行时,轮子可以正常工作。但是,当我在 Databricks Notebook 中安装它时,它会出现此错误,正如您在下面的 sbnapshot 中看到的那样:
[PATH_NOT_FOUND] Path does not exist: dbfs:/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/motor_ingesta/resources/timezones.csv. SQLSTATE: 42K03
File <command-3771510969632751>, line 7
3 from pyspark.sql import SparkSession
5 spark = SparkSession.builder.getOrCreate()
----> 7 flights_with_utc = aniade_hora_utc(spark, flights_df)
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/motor_ingesta/agregaciones.py:25, in aniade_hora_utc(spark, df)
23 path_timezones = str(Path(__file__).parent) + "/resources/timezones.csv"
24 #path_timezones = str(Path("resources") / "timezones.csv")
---> 25 timezones_df = spark.read.options(header="true", inferSchema="true").csv(path_timezones)
27 # Concateno los datos de las columnas del timezones_df ("iata_code","iana_tz","windows_tz"), a la derecha de
28 # las columnas del df original, copiando solo en las filas donde coincida el aeropuerto de origen (Origin) con
29 # el valor de la columna iata_code de timezones.df. Si algun aeropuerto de Origin no apareciera en timezones_df,
30 # las 3 columnas quedarán con valor nulo (NULL)
32 df_with_tz = df.join(timezones_df, df["Origin"] == timezones_df["iata_code"], "left_outer")
File /databricks/spark/python/pyspark/instrumentation_utils.py:47, in _wrap_function.<locals>.wrapper(*args, **kwargs)
45 start = time.perf_counter()
46 try:
---> 47 res = func(*args, **kwargs)
48 logger.log_success(
49 module_name, class_name, function_name, time.perf_counter() - start, signature
50 )
51 return res
File /databricks/spark/python/pyspark/sql/readwriter.py:830, in DataFrameReader.csv(self, path, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode, columnNameOfCorruptRecord, multiLine, charToEscapeQuoteEscaping, samplingRatio, enforceSchema, emptyValue, locale, lineSep, pathGlobFilter, recursiveFileLookup, modifiedBefore, modifiedAfter, unescapedQuoteHandling)
828 if type(path) == list:
829 assert self._spark._sc._jvm is not None
--> 830 return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
831 elif isinstance(path, RDD):
833 def func(iterator):
File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
1316 command = proto.CALL_COMMAND_NAME +\
1317 self.command_header +\
1318 args_command +\
1319 proto.END_COMMAND_PART
1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
1323 answer, self.gateway_client, self.target_id, self.name)
1325 for temp_arg in temp_args:
1326 if hasattr(temp_arg, "_detach"):
File /databricks/spark/python/pyspark/errors/exceptions/captured.py:230, in capture_sql_exception.<locals>.deco(*a, **kw)
226 converted = convert_exception(e.java_exception)
227 if not isinstance(converted, UnknownException):
228 # Hide where the exception came from that shows a non-Pythonic
229 # JVM exception message.
--> 230 raise converted from None
231 else:
232 raise
以前有人遇到过这个问题吗? 有什么解决办法吗? 我尝试使用 pip 和库安装文件,但遇到了相同的错误,还重新启动了集群几次。 在此先感谢您的帮助。 我正在使用 Python 3.11、Pyspark 3.5 和 Java 8,并从 PyCharm 本地创建轮子 如果您需要更多详细信息来回答,请询问,我会提供。
我解释了上面的所有细节。我期望能够使用我从 Databricks Notebook 本地创建的轮子。 抱歉,我的英语不是我的母语,我有点生疏。
也许您的设置初始化中缺少
package_data
?!
setup(
name="daprep",
version=__version__,
author="",
author_email="[email protected]",
description="A short summary of the project",
license="proprietary",
url="",
packages=find_packages("src"),
package_dir={"": "src"},
package_data={"daprep": ["res/**/*"]},
long_description=read("README.md"),
install_requires=read_requirements(Path("requirements.txt")),
tests_require=[
"pytest",
"pytest-cov",
"pre-commit",
],
cmdclass={
"dist": DistCommand,
"test": TestCommand,
"testcov": TestCovCommand,
},
platforms="any",
python_requires=">=3.7",
entry_points={
"console_scripts": [
"main_entrypoint = daprep.main:main_entrypoint",
]
},
)