Databricks 在我从 Databricks 笔记本运行时安装的轮子内找不到 csv 文件

问题描述 投票:0回答:1

我正在学习 Spark,因此作为一项任务,我们必须在本地创建一个轮子,然后将其安装在 Databricks 中(我使用的是 Azure Databricks),并测试它从 Databrick Notebook 运行它。该程序涉及读取wheel文件中包含的csv文件(timezones.csv)。该文件位于轮子内部(我检查过),而且当我安装轮子并从本地 PC Jupyter 笔记本运行时,轮子可以正常工作。但是,当我在 Databricks Notebook 中安装它时,它会出现此错误,正如您在下面的 sbnapshot 中看到的那样:

[PATH_NOT_FOUND] Path does not exist: dbfs:/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/motor_ingesta/resources/timezones.csv. SQLSTATE: 42K03
File <command-3771510969632751>, line 7
      3 from pyspark.sql import SparkSession
      5 spark = SparkSession.builder.getOrCreate()
----> 7 flights_with_utc = aniade_hora_utc(spark, flights_df)
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/motor_ingesta/agregaciones.py:25, in aniade_hora_utc(spark, df)
     23 path_timezones = str(Path(__file__).parent) + "/resources/timezones.csv"
     24 #path_timezones = str(Path("resources") / "timezones.csv")
---> 25 timezones_df = spark.read.options(header="true", inferSchema="true").csv(path_timezones)
     27 # Concateno los datos de las columnas del timezones_df ("iata_code","iana_tz","windows_tz"), a la derecha de
     28 # las columnas del df original, copiando solo en las filas donde coincida el aeropuerto de origen (Origin) con
     29 # el valor de la columna iata_code de timezones.df. Si algun aeropuerto de Origin no apareciera en timezones_df,
     30 # las 3 columnas quedarán con valor nulo (NULL)
     32 df_with_tz = df.join(timezones_df, df["Origin"] == timezones_df["iata_code"], "left_outer")
File /databricks/spark/python/pyspark/instrumentation_utils.py:47, in _wrap_function.<locals>.wrapper(*args, **kwargs)
     45 start = time.perf_counter()
     46 try:
---> 47     res = func(*args, **kwargs)
     48     logger.log_success(
     49         module_name, class_name, function_name, time.perf_counter() - start, signature
     50     )
     51     return res
File /databricks/spark/python/pyspark/sql/readwriter.py:830, in DataFrameReader.csv(self, path, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode, columnNameOfCorruptRecord, multiLine, charToEscapeQuoteEscaping, samplingRatio, enforceSchema, emptyValue, locale, lineSep, pathGlobFilter, recursiveFileLookup, modifiedBefore, modifiedAfter, unescapedQuoteHandling)
    828 if type(path) == list:
    829     assert self._spark._sc._jvm is not None
--> 830     return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    831 elif isinstance(path, RDD):
    833     def func(iterator):
File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
   1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:
   1326     if hasattr(temp_arg, "_detach"):
File /databricks/spark/python/pyspark/errors/exceptions/captured.py:230, in capture_sql_exception.<locals>.deco(*a, **kw)
    226 converted = convert_exception(e.java_exception)
    227 if not isinstance(converted, UnknownException):
    228     # Hide where the exception came from that shows a non-Pythonic
    229     # JVM exception message.
--> 230     raise converted from None
    231 else:
    232     raise

Databricks Error Snapshot 1 Databricks Error Snapshot 2 以前有人遇到过这个问题吗? 有什么解决办法吗? 我尝试使用 pip 和库安装文件,但遇到了相同的错误,还重新启动了集群几次。 在此先感谢您的帮助。 我正在使用 Python 3.11、Pyspark 3.5 和 Java 8,并从 PyCharm 本地创建轮子 如果您需要更多详细信息来回答,请询问,我会提供。

我解释了上面的所有细节。我期望能够使用我从 Databricks Notebook 本地创建的轮子。 抱歉,我的英语不是我的母语,我有点生疏。

python csv apache-spark pyspark databricks
1个回答
0
投票

也许您的设置初始化中缺少

package_data
?!


setup(
    name="daprep",
    version=__version__,
    author="",
    author_email="[email protected]",
    description="A short summary of the project",
    license="proprietary",
    url="",
    packages=find_packages("src"),
    package_dir={"": "src"},
    package_data={"daprep": ["res/**/*"]},
    long_description=read("README.md"),
    install_requires=read_requirements(Path("requirements.txt")),
    tests_require=[
        "pytest",
        "pytest-cov",
        "pre-commit",
    ],
    cmdclass={
        "dist": DistCommand,
        "test": TestCommand,
        "testcov": TestCovCommand,
    },
    platforms="any",
    python_requires=">=3.7",
    entry_points={
        "console_scripts": [
            "main_entrypoint = daprep.main:main_entrypoint",
        ]
    },
)
© www.soinside.com 2019 - 2024. All rights reserved.