Pyspark - 读取 csv 并将其保存到 delta 格式文件夹时出错(无 pandas)

问题描述 投票:0回答:1

Pyspark 新手,正在尝试使用 parquet/delta 生态系统。

尝试编写一个执行以下操作的脚本

  1. 将 csv 文件读入 Spark 数据帧。
  2. 将其另存为镶木地板文件。
  3. 将上面保存的 parquet 文件读入 Spark 数据帧。
  4. 将其另存为 delta 格式文件/文件夹。
  5. 从上面的增量文件创建一个增量表对象。
  6. 更新/追加到表格

我可以做到第 3 步,但第 4 步出错了。

脚本

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from delta import *
from delta.tables import *
from pyspark.sql.types import *

from pyspark import SparkFiles
from pyspark.context import SparkContext

print("Kernel:", sys.executable)
print("Python version:", sys.version)
print("Spark version:", spark.version)
print("PySpark version:", pyspark.__version__)
print("PySpark Version :", spark.sparkContext.version)
    
spark = (
    SparkSession.builder.master("local[*]")
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.1.0")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .getOrCreate()
)

csv_filename = "dataset/BankChurners.csv"

# read csv to spark dataframe
df_csv = spark.read.format("csv").option("header","true").option("inferSchema","true").load(csv_filename)

# save to parquet file
df_csv.write.parquet ("output/BankChurners_spark.parquet", mode="overwrite")

# read parquet file to spark dataframe
df_parq = spark.read.parquet("output/BankChurners_spark.parquet")

# save to delta format file
df_parq.write.format("delta").save("output/BankChurners_delta_table")

# read delta table
deltaTable = DeltaTable.forPath(spark, "output/BankChurners_delta_table")

# add a record and save delta table
# did not find any example / syntax for adding a record

输出

Python version: 3.9.6 (default, Mar 29 2024, 10:51:09)
[Clang 15.0.0 (clang-1500.3.9.4)]
Spark version: 3.5.1
PySpark version: 3.5.1
PySpark Version : 3.5.1
...
...
...
Traceback (most recent call last):
  File "/Users/foobar/workspace/practice/parquet/question.py", line 31, in <module>
    df_parq.write.format("delta").save("output/BankChurners_delta_table")
  File "/Users/foobar/Library/Python/3.9/lib/python/site-packages/pyspark/sql/readwriter.py", line 1463, in save
    self._jwrite.save(path)
  File "/Users/foobar/Library/Python/3.9/lib/python/site-packages/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
  File "/Users/foobar/Library/Python/3.9/lib/python/site-packages/pyspark/errors/exceptions/captured.py", line 179, in deco
    return f(*a, **kw)
  File "/Users/foobar/Library/Python/3.9/lib/python/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o44.save.
: java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/FileFormatWriter$Empty2Null

问题

  1. 如何创建增量表?

  2. 对于读取文件(csv 或 parquet),我注意到两种不同的方法。首选哪一个?

    我。

    df_csv = spark.read.format("csv").option("header","true").option("inferSchema","true").load(csv_filename)

    二.

    df_csv = spark.read.csv(csv_filename, header=True, inferSchema=True)

  3. 这是创建现有增量表对象的正确方法吗?

    deltaTable = DeltaTable.forPath(spark, "/path/to/table")

  4. 如果上述答案是肯定的,那么如何将记录追加到该数据表中以便更新表版本?

  5. 是否有不需要导入的模块?

如有任何您认为不正确的地方,请随时发表评论。
谢谢

pyspark delta-lake delta
1个回答
0
投票

当您在本地工作时,您的问题可能是 Spark Delta 配置。


builder = SparkSession.builder.appName("MyApp") \
        .master("local[*]")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    
spark = configure_spark_with_delta_pip(builder).getOrCreate()

``
© www.soinside.com 2019 - 2024. All rights reserved.