在 Spark 中将日期转换为 ISO 周日期

问题描述 投票:0回答:4

在一列中有日期,如何创建包含 ISO 周日期的列?

ISO 星期日期由 年份周数工作日组成。

  • year 与使用
    year
    函数获取的年份不同。
  • 周数是简单的部分 - 可以使用
    weekofyear
    获得。
  • weekday 应在周一返回 1,在周日返回 7,而 Spark 的
    dayofweek
    则无法做到这一点。

示例数据框:

from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([
    ('1977-12-31',),
    ('1978-01-01',),
    ('1978-01-02',),
    ('1978-12-31',),
    ('1979-01-01',),
    ('1979-12-30',),
    ('1979-12-31',),
    ('1980-01-01',)],
    ['my_date']
).select(F.col('my_date').cast('date'))

df.show()
#+----------+
#|   my_date|
#+----------+
#|1977-12-31|
#|1978-01-01|
#|1978-01-02|
#|1978-12-31|
#|1979-01-01|
#|1979-12-30|
#|1979-12-31|
#|1980-01-01|
#+----------+

想要的结果:

+----------+-------------+
|   my_date|iso_week_date|
+----------+-------------+
|1977-12-31|   1977-W52-6|
|1978-01-01|   1977-W52-7|
|1978-01-02|   1978-W01-1|
|1978-12-31|   1978-W52-7|
|1979-01-01|   1979-W01-1|
|1979-12-30|   1979-W52-7|
|1979-12-31|   1980-W01-1|
|1980-01-01|   1980-W01-2|
+----------+-------------+
apache-spark date pyspark apache-spark-sql spark3
4个回答
6
投票

Spark SQL

extract
使这变得更加容易。

  • iso_year
    =
    F.expr("EXTRACT(YEAROFWEEK FROM my_date)")
  • iso_weekday
    =
    F.expr("EXTRACT(DAYOFWEEK_ISO FROM my_date)")

因此,使用

concat_ws
构建其他答案:

import pyspark.sql.functions as F

df.withColumn(
    'iso_week_date',
    F.concat_ws(
        "-",
        F.expr("EXTRACT(YEAROFWEEK FROM my_date)"),
        F.lpad(F.weekofyear('my_date'), 3, "W0"),
        F.expr("EXTRACT(DAYOFWEEK_ISO FROM my_date)")
    )
).show()

#+----------+-------------+
#|   my_date|iso_week_date|
#+----------+-------------+
#|1977-12-31|   1977-W52-6|
#|1978-01-01|   1977-W52-7|
#|1978-01-02|   1978-W01-1|
#|1978-12-31|   1978-W52-7|
#|1979-01-01|   1979-W01-1|
#|1979-12-30|   1979-W52-7|
#|1979-12-31|   1980-W01-1|
#|1980-01-01|   1980-W01-2|
#+----------+-------------+

4
投票

你的解决方案已经很好了,也许你可以通过简化计算来缩短它:

  • iso_weekday
    =
    (dayofweek(my_date) + 5)%7 + 1
  • iso_year
    =
    year(date_add(my_date, 4 - iso_weekday))

这给你:

import pyspark.sql.functions as F

df.withColumn(
    'iso_week_date',
    F.concat_ws(
        "-",
        F.year(F.expr("date_add(my_date, 4 - (dayofweek(my_date) + 5) % 7 + 1)")),
        F.lpad(F.weekofyear('my_date'), 3, "W0"),
        (F.dayofweek('my_date') + 5) % 7 + 1
    )
).show()

#+----------+-------------+
#|   my_date|iso_week_date|
#+----------+-------------+
#|1977-12-31|   1977-W52-6|
#|1978-01-01|   1977-W52-7|
#|1978-01-02|   1978-W01-1|
#|1978-12-31|   1978-W52-7|
#|1979-01-01|   1979-W01-1|
#|1979-12-30|   1979-W52-7|
#|1979-12-31|   1980-W01-1|
#|1980-01-01|   1980-W01-2|
#+----------+-------------+

0
投票

首先,可以为yearweekday的列创建规则。然后,使用

concat_ws
lpad
将它们连接起来。

week_from_prev_year = (F.month('my_date') == 1) & (F.weekofyear('my_date') > 9)
week_from_next_year = (F.month('my_date') == 12) & (F.weekofyear('my_date') == 1)
iso_year = F.when(week_from_prev_year, F.year('my_date') - 1) \
            .when(week_from_next_year, F.year('my_date') + 1) \
            .otherwise(F.year('my_date'))
iso_weekday = F.when(F.dayofweek('my_date') != 1, F.dayofweek('my_date')-1).otherwise(7)
iso_week_date = F.concat_ws('-', iso_year, F.lpad(F.weekofyear('my_date'), 3, 'W0'), iso_weekday)
df2 = df.withColumn('iso_week_date', iso_week_date)

df2.show()
#+----------+-------------+
#|   my_date|iso_week_date|
#+----------+-------------+
#|1977-12-31|   1977-W52-6|
#|1978-01-01|   1977-W52-7|
#|1978-01-02|   1978-W01-1|
#|1978-12-31|   1978-W52-7|
#|1979-01-01|   1979-W01-1|
#|1979-12-30|   1979-W52-7|
#|1979-12-31|   1980-W01-1|
#|1980-01-01|   1980-W01-2|
#+----------+-------------+

0
投票

另一种解决方案,使用

.withColumn()
连接年份、“-W”和 iso 周数,其中第 1-9 周用“0”填充。

import pyspark.sql.functions as f

df.withColumn("iso_week_date", f.concat(f.year(col('my_date')), lit("-W"), f.lpad(f.weekofyear(col('my_date')), 2, "0")))
© www.soinside.com 2019 - 2024. All rights reserved.