在Python极坐标中将时间序列数据从一个df插值到另一个df的时间轴

问题描述 投票:0回答:2

我有不同数据帧中不同时间轴上的时间序列数据。我需要将数据从一个

df
插入到另一个 df
df_ref
的时间轴上。例如:

import polars as pl

# DataFrame with the reference time axis:
df_ref = pl.DataFrame({"dt": ["2022-12-14T14:00:01.000", "2022-12-14T14:00:02.000",
                              "2022-12-14T14:00:03.000", "2022-12-14T14:00:04.000",
                              "2022-12-14T14:00:05.000", "2022-12-14T14:00:06.000"]})
df_ref = df_ref.with_columns(pl.col("dt").str.strptime(pl.Datetime).cast(pl.Datetime))

# DataFrame with a different frequency time axis, to be interpolated onto the reference time axis:
df = pl.DataFrame({
        "dt": ["2022-12-14T14:00:01.500", "2022-12-14T14:00:03.500", "2022-12-14T14:00:05.500"],
        "v1": [1.5, 3.5, 5.5]})
df = df.with_columns(pl.col("dt").str.strptime(pl.Datetime).cast(pl.Datetime))

我无法

join
dfs,因为键不匹配:

print(df_ref.join(df, on="dt", how="left").interpolate())
shape: (6, 2)
┌─────────────────────┬──────┐
│ dt                  ┆ v1   │
│ ---                 ┆ ---  │
│ datetime[μs]        ┆ f64  │
╞═════════════════════╪══════╡
│ 2022-12-14 14:00:01 ┆ null │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┤
│ 2022-12-14 14:00:02 ┆ null │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┤
│ 2022-12-14 14:00:03 ┆ null │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┤
│ 2022-12-14 14:00:04 ┆ null │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┤
│ 2022-12-14 14:00:05 ┆ null │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┤
│ 2022-12-14 14:00:06 ┆ null │
└─────────────────────┴──────┘

所以我的“迭代”方法是单独插入每一列,例如

from scipy.interpolate import interp1d

f = interp1d(df["dt"].dt.timestamp(), df["v1"],
             kind="linear", bounds_error=False, fill_value="extrapolate")

out = f(df_ref["dt"].dt.timestamp())
df_ref = df_ref.with_columns(pl.Series(out).alias("v1_interp"))

print(df_ref.head(6))
shape: (6, 2)
┌─────────────────────┬───────────┐
│ dt                  ┆ v1_interp │
│ ---                 ┆ ---       │
│ datetime[μs]        ┆ f64       │
╞═════════════════════╪═══════════╡
│ 2022-12-14 14:00:01 ┆ 1.0       │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┤
│ 2022-12-14 14:00:02 ┆ 2.0       │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┤
│ 2022-12-14 14:00:03 ┆ 3.0       │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┤
│ 2022-12-14 14:00:04 ┆ 4.0       │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┤
│ 2022-12-14 14:00:05 ┆ 5.0       │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┤
│ 2022-12-14 14:00:06 ┆ 6.0       │
└─────────────────────┴───────────┘

虽然这给出了我需要的结果,但我想知道是否有更惯用的方法?我不愿意在这里提及效率,因为我还没有用真实数据对其进行基准测试(“测量,不要猜测!”)。但是,我认为底层 Rust 代码中的本机实现可以增加一些性能优势。

python dataframe time-series interpolation python-polars
2个回答
4
投票

scipy.interpolate.interpol1d
示例最终调用此函数。

您可以使用相同的方法并使用

.map()

处理每一列
def polars_ip(df_ref, df):
   old = df["dt"].dt.timestamp().to_numpy()
   new = df_ref["dt"].dt.timestamp().to_numpy()

   hi = np.searchsorted(old, new).clip(1, len(old) - 1)
   lo = hi - 1

   def _interp(column):
      column = column.to_numpy()
      slope = (column[hi] - column[lo]) / (old[hi] - old[lo])
      return pl.Series(slope * (new - old[lo]) + column[lo])
      
   values = (
      pl.concat([df, df_ref], how="diagonal")
        .select(pl.exclude("dt").map(_interp))
   )
   values.columns = [f"{name}_ref_ip" for name in values.columns]
   
   return df_ref.hstack(values)  
>>> %time polars_ip(df_ref, df)
CPU times: user 48.1 ms, sys: 20.4 ms, total: 68.5 ms
Wall time: 22 ms
shape: (85536, 11)
>>> %time scipy_ip(df_ref, df)
CPU times: user 75.5 ms, sys: 5.51 ms, total: 81 ms
Wall time: 74.3 ms
shape: (85536, 11)

检查它们返回相同的值:

>>> polars_ip(df_ref, df).frame_equal(scipy_ip(df_ref, df))
True

您还可以使用以下方法生成相同的值:

N_COLS = 10
names = list(map(str, range(N_COLS)))
has_reading = pl.col(names[0]).is_not_null()
has_no_reading = has_reading.is_not()
(
   pl.concat([df, df_ref], how="diagonal")
   .sort("dt")
   .with_columns([
      pl.when(has_reading).then(pl.all())
        .shift(-1).backward_fill().suffix("_hi"),
      pl.when(has_reading).then(pl.all())
        .shift(+1).forward_fill().suffix("_lo")
      ])
   .with_columns([
      pl.when(has_reading).then(pl.col(r"^.+_hi$"))
        .forward_fill().backward_fill(),
      pl.when(has_reading).then(pl.col(r"^.+_lo$"))
        .backward_fill().forward_fill()
      ])
   .filter(has_no_reading)
   .with_column(
      pl.col(r"^dt.*$").dt.timestamp().suffix("_ts"))
   .with_columns([
      (((pl.col(f"{name}_hi")  - pl.col(f"{name}_lo")) 
         / (pl.col("dt_hi_ts") - pl.col("dt_lo_ts")))
         * (pl.col("dt_ts")    - pl.col("dt_lo_ts")) 
         + pl.col(f"{name}_lo"))
         .alias(f"{name}_ref_ip") for name in names
      ])
   .select([
      pl.col("dt"), pl.col("^.+_ref_ip$")
   ])
)  

1
投票

第一种方法

我认为这是其中一个问题,您想要创建比实际想要的多得多的行,然后过滤回您想要的行。

因为 Polars 的

interpolate
函数仅计算已知值之间的缺失值,而不是向前和向后推断,所以让我们第一步手动推断 df1 以在前后添加额外的行。

df1=df1.lazy()
df1=pl.concat([df1,
        df1.sort('dt').with_row_count('n') \
            .select(
                    [pl.col('n')] + \
                    [pl.when(pl.col('n')<=1) \
                       .then(pl.col(x)-(pl.col(x).shift(-1)-pl.col(x))) \
                    .when(pl.col('n')>=pl.col('n').max()-1) \
                       .then(pl.col(x)+(pl.col(x)-pl.col(x).shift(1)))
                               for x in df1.columns]
                ) \
        .filter((pl.col('n')==0) | (pl.col('n')==pl.col('n').max())) \
        .select(pl.exclude('n'))]).sort('dt')

我在

select
中使用列表理解,因此这应该可以扩展到任意数量的列。

接下来要做的是创建一个带有 dt 列的 df,该列从最早的 dt 开始,以 df0 和 df1 之间时间差最小的最新 dt 结束。通过修复关键列中的差异,它可以让 Polars 的

interpolate
按您的预期工作。

specs = pl.concat([df0.select('dt'),df1.select('dt')]) \
          .sort('dt').select([
                       pl.col('dt').min().alias('mindt'),
                       pl.col('dt').max().alias('maxdt'), 
                       (pl.col('dt')-pl.col('dt').shift()).min().alias('mindiff')
                      ]).collect()


newdf = pl.DataFrame({'dt':pl.date_range(specs[0,0], specs[0,1], specs[0,2])}).lazy()

或者,您可以使用列表理解来创建

newdf
,以防 dt 不是日期时间
pl.DataFrame({'dt': [specs[0,0] + specs[0,2]*x for x in range(int(1+(specs[0,1]-specs[0,0])/specs[0,2]))]}).lazy()

这样,您就可以在它和两个 dfs 之间进行外连接,然后使用嵌入的

interpolate
来获取您要查找的所有值。您可以链接一个过滤器并在最后进行选择以匹配您的输出。

newdf = newdf.join(df0, on='dt', how='outer') \
    .join(df1, on='dt', how='outer') \
    .with_columns([pl.col(x).interpolate().suffix('_interp') for x in df1.columns if x != 'dt']) \
    .filter(~pl.col('v0').is_null()).select(pl.exclude('v1')) \
    .collect()

第二种方法

解决该问题的另一种方法是使用一堆shift和whenthen语句从本质上重新创建scipy插值函数...

首先进行对角连接,然后添加一堆表示 dt 和 v1 列但已移位的辅助列,一对用于向前移位,另一对用于向后移位。然后通过时间差计算

v1
的变化,然后将其向前和向后结转。几乎最后是开始/结束/中间行的whenthen逻辑。最后,过滤并选择辅助列。

    pl.concat([df0, df1], how='diagonal').sort('dt') \
    .with_column(pl.when(~pl.col('v1').is_null()).then(pl.col('dt')).alias('v1dt')) \
    .with_columns([
        pl.col('v1').fill_null(strategy='forward').alias('v1_for'),
        pl.col('v1dt').fill_null(strategy='forward').alias('v1dt_for'),
        pl.col('v1').fill_null(strategy='backward').alias('v1_back'),
        pl.col('v1dt').fill_null(strategy='backward').alias('v1dt_back')
        ]) \
    .with_column(((pl.col('v1_back')-pl.col('v1_for'))/(pl.col('v1dt_back')-pl.col('v1dt_for'))).alias('diff')) \
    .with_column((pl.when(pl.col('diff').is_nan()).then(None).otherwise(pl.col('diff'))).alias('diff')) \
    .with_column(pl.col('diff').fill_null(strategy='forward').fill_null(strategy='backward')) \
    .with_column((pl.when(~pl.col('v1').is_null()).then(pl.col('v1')) \
                .when((~pl.col('v1_for').is_null()) & (~pl.col('v1_back').is_null())) \
                        .then((pl.col('dt')-pl.col('v1dt_for'))*pl.col('diff')+pl.col('v1_for')) \
                .when(~pl.col('v1_back').is_null()) \
                        .then(pl.col('v1_back')-(pl.col('v1dt_back')-pl.col('dt'))*pl.col('diff')) \
                .otherwise(pl.col('v1_for')+(pl.col('dt')-pl.col('v1dt_for'))*pl.col('diff'))).alias('v1_interp')) \
    .filter(~pl.col('v0').is_null()).select(['dt','v0','v1_interp'])
© www.soinside.com 2019 - 2024. All rights reserved.