MongoDB 中的数据类型未通过 databricks Pyspark 更改(从字符串到日期)

问题描述 投票:0回答:1

我正在尝试使用 databricks pyspark 将多个文档加载到 MongoDb 集合中,在加载时我也使用 updateDate 归档,但加载后我可以看到 updateDate 字段数据类型是字符串而不是日期数据类型。

这里我使用时间戳代码。

import datetime

current_timestamp_utc = datetime.datetime.now(datetime.timezone.utc)
formatted_timestamp = current_timestamp_utc.strftime("%Y-%m-%dT%H:%M:%S")
timezone_offset = current_timestamp_utc.strftime("%z")
formatted_timestamp = formatted_timestamp + ".000" + timezone_offset[:-2] + ":" + 
timezone_offset[-2:]

print(formatted_timestamp)

result : 2024-04-03T07:33:52.000+00:00

结果看起来不错,但是加载到 MongoDb 后,它显示为字符串而不是日期。

那么您能否帮助我了解如何加载日期数据类型的文档。 我已使用 UpdateMany() 方法将字符串更改为日期数据类型,这是要继续的写入方法吗?使用 updatemany() 方法时是否有任何 I/O 或性能影响。请推荐

mongodb pyspark databricks azure-databricks
1个回答
0
投票

您可以使用 Spark SQL 日期时间函数直接获取当前时间,如下所示:

from pyspark.sql import SparkSession
from pyspark.sql.functions import date_format, current_timestamp

spark = SparkSession.builder.getOrCreate()

spark.sql("""select date_format(current_timestamp(), "yyyy-MM-dd'T'HH:MM:ss.SSSxxx") as updateDate""").show(truncate=False)

Output:
+-----------------------------+
|updateDate                   |
+-----------------------------+
|2024-04-04T09:04:35.865+00:00|
+-----------------------------+

Schema:
root
 |-- updateDate: string (nullable = false)

如果您注意到模式,

updateDate
是一个字符串,您可以使用
to_timestamp()
将其转换为时间戳,如下所示:

from pyspark.sql import SparkSession
from pyspark.sql.functions import date_format, current_timestamp, to_timestamp

spark = SparkSession.builder.getOrCreate()

spark.sql("""select to_timestamp(date_format(current_timestamp(), "yyyy-MM-dd'T'HH:MM:ss.SSSxxx")) as updateDate""").show(truncate=False)

Output:
+-----------------------+
|updateDate             |
+-----------------------+
|2024-04-04 09:04:12.703|
+-----------------------+

Schema:
root
 |-- updateDate: timestamp (nullable = true)

现在

updateDate
是根据您的 Spark 会话时区调整的时间戳(这就是
+00:00
偏移量现在消失的原因) - 顺便说一句,您可以使用
spark.conf.set("spark.sql.session.timeZone", "<enter-timezone-here>")
进行更新。

如果您想将其作为新列添加到现有数据框中,您可以执行以下操作:

from pyspark.sql import SparkSession
from pyspark.sql.functions import date_format, current_timestamp

spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame([(1,), (2,)], ["rownum"]) # replace this with your dataframe

df = df.withColumn("updateDate", date_format(current_timestamp(), "yyyy-MM-dd'T'HH:MM:ss.SSSxxx").cast("timestamp"))

df.show(truncate=False)
df.printSchema()

Output:
+------+-----------------------+
|rownum|updateDate             |
+------+-----------------------+
|1     |2024-04-04 09:04:48.473|
|2     |2024-04-04 09:04:48.473|
+------+-----------------------+

Schema:
root
 |-- rownum: long (nullable = true)
 |-- updateDate: timestamp (nullable = true)

您要查找的数据类型是 Spark 中带有时区的时间戳。现在,您可以尝试使用此模式将数据集加载到 MongoDB 中。

© www.soinside.com 2019 - 2024. All rights reserved.