Pyspark 新手,正在尝试使用 parquet/delta 生态系统。
尝试编写一个执行以下操作的脚本
我可以做到第 3 步,但第 4 步出错了。
脚本
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from delta import *
from delta.tables import *
from pyspark.sql.types import *
from pyspark import SparkFiles
from pyspark.context import SparkContext
print("Kernel:", sys.executable)
print("Python version:", sys.version)
print("Spark version:", spark.version)
print("PySpark version:", pyspark.__version__)
print("PySpark Version :", spark.sparkContext.version)
spark = (
SparkSession.builder.master("local[*]")
.config("spark.jars.packages", "io.delta:delta-core_2.12:2.1.0")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.getOrCreate()
)
csv_filename = "dataset/BankChurners.csv"
# read csv to spark dataframe
df_csv = spark.read.format("csv").option("header","true").option("inferSchema","true").load(csv_filename)
# save to parquet file
df_csv.write.parquet ("output/BankChurners_spark.parquet", mode="overwrite")
# read parquet file to spark dataframe
df_parq = spark.read.parquet("output/BankChurners_spark.parquet")
# save to delta format file
df_parq.write.format("delta").save("output/BankChurners_delta_table")
# read delta table
deltaTable = DeltaTable.forPath(spark, "output/BankChurners_delta_table")
# add a record and save delta table
# did not find any example / syntax for adding a record
输出
Python version: 3.9.6 (default, Mar 29 2024, 10:51:09)
[Clang 15.0.0 (clang-1500.3.9.4)]
Spark version: 3.5.1
PySpark version: 3.5.1
PySpark Version : 3.5.1
...
...
...
Traceback (most recent call last):
File "/Users/foobar/workspace/practice/parquet/question.py", line 31, in <module>
df_parq.write.format("delta").save("output/BankChurners_delta_table")
File "/Users/foobar/Library/Python/3.9/lib/python/site-packages/pyspark/sql/readwriter.py", line 1463, in save
self._jwrite.save(path)
File "/Users/foobar/Library/Python/3.9/lib/python/site-packages/py4j/java_gateway.py", line 1322, in __call__
return_value = get_return_value(
File "/Users/foobar/Library/Python/3.9/lib/python/site-packages/pyspark/errors/exceptions/captured.py", line 179, in deco
return f(*a, **kw)
File "/Users/foobar/Library/Python/3.9/lib/python/site-packages/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o44.save.
: java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/FileFormatWriter$Empty2Null
问题:
如何创建增量表?
对于读取文件(csv 或 parquet),我注意到两种不同的方法。首选哪一个?
我。
df_csv = spark.read.format("csv").option("header","true").option("inferSchema","true").load(csv_filename)
二.
df_csv = spark.read.csv(csv_filename, header=True, inferSchema=True)
这是创建现有增量表对象的正确方法吗?
deltaTable = DeltaTable.forPath(spark, "/path/to/table")
如果上述答案是肯定的,那么如何将记录追加到该数据表中以便更新表版本?
是否有不需要导入的模块?
如有任何您认为不正确的地方,请随时发表评论。
谢谢
当您在本地工作时,您的问题可能是 Spark Delta 配置。
builder = SparkSession.builder.appName("MyApp") \
.master("local[*]")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()
``