我正在尝试使用 databricks pyspark 将多个文档加载到 MongoDb 集合中,在加载时我也使用 updateDate 归档,但加载后我可以看到 updateDate 字段数据类型是字符串而不是日期数据类型。
这里我使用时间戳代码。
import datetime
current_timestamp_utc = datetime.datetime.now(datetime.timezone.utc)
formatted_timestamp = current_timestamp_utc.strftime("%Y-%m-%dT%H:%M:%S")
timezone_offset = current_timestamp_utc.strftime("%z")
formatted_timestamp = formatted_timestamp + ".000" + timezone_offset[:-2] + ":" +
timezone_offset[-2:]
print(formatted_timestamp)
result : 2024-04-03T07:33:52.000+00:00
结果看起来不错,但是加载到 MongoDb 后,它显示为字符串而不是日期。
那么您能否帮助我了解如何加载日期数据类型的文档。 我已使用 UpdateMany() 方法将字符串更改为日期数据类型,这是要继续的写入方法吗?使用 updatemany() 方法时是否有任何 I/O 或性能影响。请推荐
您可以使用 Spark SQL 日期时间函数直接获取当前时间,如下所示:
from pyspark.sql import SparkSession
from pyspark.sql.functions import date_format, current_timestamp
spark = SparkSession.builder.getOrCreate()
spark.sql("""select date_format(current_timestamp(), "yyyy-MM-dd'T'HH:MM:ss.SSSxxx") as updateDate""").show(truncate=False)
Output:
+-----------------------------+
|updateDate |
+-----------------------------+
|2024-04-04T09:04:35.865+00:00|
+-----------------------------+
Schema:
root
|-- updateDate: string (nullable = false)
如果您注意到模式,
updateDate
是一个字符串,您可以使用to_timestamp()
将其转换为时间戳,如下所示:
from pyspark.sql import SparkSession
from pyspark.sql.functions import date_format, current_timestamp, to_timestamp
spark = SparkSession.builder.getOrCreate()
spark.sql("""select to_timestamp(date_format(current_timestamp(), "yyyy-MM-dd'T'HH:MM:ss.SSSxxx")) as updateDate""").show(truncate=False)
Output:
+-----------------------+
|updateDate |
+-----------------------+
|2024-04-04 09:04:12.703|
+-----------------------+
Schema:
root
|-- updateDate: timestamp (nullable = true)
现在
updateDate
是根据您的 Spark 会话时区调整的时间戳(这就是 +00:00
偏移量现在消失的原因) - 顺便说一句,您可以使用 spark.conf.set("spark.sql.session.timeZone", "<enter-timezone-here>")
进行更新。
如果您想将其作为新列添加到现有数据框中,您可以执行以下操作:
from pyspark.sql import SparkSession
from pyspark.sql.functions import date_format, current_timestamp
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([(1,), (2,)], ["rownum"]) # replace this with your dataframe
df = df.withColumn("updateDate", date_format(current_timestamp(), "yyyy-MM-dd'T'HH:MM:ss.SSSxxx").cast("timestamp"))
df.show(truncate=False)
df.printSchema()
Output:
+------+-----------------------+
|rownum|updateDate |
+------+-----------------------+
|1 |2024-04-04 09:04:48.473|
|2 |2024-04-04 09:04:48.473|
+------+-----------------------+
Schema:
root
|-- rownum: long (nullable = true)
|-- updateDate: timestamp (nullable = true)
您要查找的数据类型是 Spark 中带有时区的时间戳。现在,您可以尝试使用此模式将数据集加载到 MongoDB 中。