我在使用databricks时遇到一个问题,当我在模式中设置特定类型并读取json时,它的值很好,但是在保存我的df并再次加载后,该值消失了。
我有这个示例代码显示了这个问题:
from pyspark.sql.types import StructType, StructField, StringType, MapType, DoubleType, ArrayType, LongType, BooleanType
import os
from pyspark.sql.functions import lit, current_date, date_sub, input_file_name, regexp_extract, split,expr, to_date, date_format,from_json, col
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.utils import AnalysisException
from datetime import datetime, timedelta
from delta.tables import *
import logging
import tempfile
import inspect
import collections
import json
logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
mockschema = StructType([
StructField("bot", BooleanType(), True),
StructField("channel", StringType(), True),
StructField("chills", StringType(), True),
StructField("cookies", MapType(StringType(), StringType()), True),
StructField("geo", StructType([
StructField("city", StringType(), True),
StructField("country", StringType(), True),
StructField("region", StringType(), True),
# StructField("latitude", StringType(), True), # this works, user id is shown
# StructField("longitude", StringType(), True) # this works, user id is shown
StructField("latitude", DoubleType(), True), # this does not work, user id is null
StructField("longitude", DoubleType(), True) # this does not work, user id is null
]), True),
StructField("ip", StringType(), True),
StructField("timestamp", LongType(), True),
StructField("ua", StringType(), True),
StructField("url", StringType(), True),
StructField("user_id", StringType(), True),
StructField("vibes", StringType(), True)
])
mock_data = {
"bot": False,
"channel": "YES",
"chills": "VERY_COLD",
"cookies": {
"1111": "aa",
"2222": "bb",
"3333": "cc",
"4444": "dd",
"5555": "ee",
"6666": "ff",
},
"geo": {
"city": "Such City",
"country": "AA",
"region": "WOWW",
"latitude": -1.924868,
"longitude": 25.424055
},
"ip": "123.45.678.90",
"timestamp": 1711711760169,
"ua": "SOME_BROWSER_USER_AGENT",
"url": "www.some.url.com",
"user_id": "123456789"
}
output_format = 'delta'
with tempfile.TemporaryDirectory() as d:
spark.createDataFrame(
[mock]
).write.mode("overwrite").format("json").save(d)
df = spark.read.schema(mockschema).json(d)
df = df.withColumn("date_partition", regexp_extract(input_file_name(), r'.*\/(\d{4}-\d{1,2}-\d{1,2})\/.*', 1))
df = df.withColumn("date_partition", date_format(to_date("date_partition", "yyyy-M-d"), "yyyy-MM-dd")) #Fortmat with leading zeros for better sorting
df = df.withColumn("filename", expr("regexp_extract(input_file_name(), '[^/]+$', 0)"))
display(df[['user_id', 'filename']]) # df before saving, have user id
df_repartitioned = df.repartition("date_partition")
df_repartitioned.write.format(output_format).mode("overwrite").save(f"{d}/df")
logger.info(f"dumped df to temp folder")
df = spark.read.format(output_format).load(f"{d}/df")
display(df[['user_id', 'filename']]) # df after saving, missing user id when double type in geo
在mockschema中,将geo设置为doubletype使得保存的df user_id变为none,并将其设置为字符串“解决”问题。
我需要理解为什么会发生这种情况,因为我有一些其他代码在保存后存在相同的字段丢失问题,但无法追踪为什么此模式会导致错误。有没有人遇到过类似的事情并可以解释一下?
提前致谢<3
这个“问题”是通过使用另一种模式类型来解决的,但问题出在“user_id”字段上,模式问题出现在“geo”中,我们有其他模式,无法跟踪哪个模式干扰另一个模式。
我想首先了解导致此行为的原因。
保存后user_id的值应该和之前一样,不丢失任何数据