我有不同数据帧中不同时间轴上的时间序列数据。我需要将数据从一个
df
插入到另一个 df df_ref
的时间轴上。例如:
import polars as pl
# DataFrame with the reference time axis:
df_ref = pl.DataFrame({"dt": ["2022-12-14T14:00:01.000", "2022-12-14T14:00:02.000",
"2022-12-14T14:00:03.000", "2022-12-14T14:00:04.000",
"2022-12-14T14:00:05.000", "2022-12-14T14:00:06.000"]})
df_ref = df_ref.with_columns(pl.col("dt").str.strptime(pl.Datetime).cast(pl.Datetime))
# DataFrame with a different frequency time axis, to be interpolated onto the reference time axis:
df = pl.DataFrame({
"dt": ["2022-12-14T14:00:01.500", "2022-12-14T14:00:03.500", "2022-12-14T14:00:05.500"],
"v1": [1.5, 3.5, 5.5]})
df = df.with_columns(pl.col("dt").str.strptime(pl.Datetime).cast(pl.Datetime))
我无法
join
dfs,因为键不匹配:
print(df_ref.join(df, on="dt", how="left").interpolate())
shape: (6, 2)
┌─────────────────────┬──────┐
│ dt ┆ v1 │
│ --- ┆ --- │
│ datetime[μs] ┆ f64 │
╞═════════════════════╪══════╡
│ 2022-12-14 14:00:01 ┆ null │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┤
│ 2022-12-14 14:00:02 ┆ null │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┤
│ 2022-12-14 14:00:03 ┆ null │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┤
│ 2022-12-14 14:00:04 ┆ null │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┤
│ 2022-12-14 14:00:05 ┆ null │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┤
│ 2022-12-14 14:00:06 ┆ null │
└─────────────────────┴──────┘
所以我的“迭代”方法是单独插入每一列,例如
from scipy.interpolate import interp1d
f = interp1d(df["dt"].dt.timestamp(), df["v1"],
kind="linear", bounds_error=False, fill_value="extrapolate")
out = f(df_ref["dt"].dt.timestamp())
df_ref = df_ref.with_columns(pl.Series(out).alias("v1_interp"))
print(df_ref.head(6))
shape: (6, 2)
┌─────────────────────┬───────────┐
│ dt ┆ v1_interp │
│ --- ┆ --- │
│ datetime[μs] ┆ f64 │
╞═════════════════════╪═══════════╡
│ 2022-12-14 14:00:01 ┆ 1.0 │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┤
│ 2022-12-14 14:00:02 ┆ 2.0 │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┤
│ 2022-12-14 14:00:03 ┆ 3.0 │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┤
│ 2022-12-14 14:00:04 ┆ 4.0 │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┤
│ 2022-12-14 14:00:05 ┆ 5.0 │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┤
│ 2022-12-14 14:00:06 ┆ 6.0 │
└─────────────────────┴───────────┘
虽然这给出了我需要的结果,但我想知道是否有更惯用的方法?我不愿意在这里提及效率,因为我还没有用真实数据对其进行基准测试(“测量,不要猜测!”)。但是,我认为底层 Rust 代码中的本机实现可以增加一些性能优势。
scipy.interpolate.interpol1d
示例最终调用此函数。
您可以使用相同的方法并使用
.map()
处理每一列
def polars_ip(df_ref, df):
old = df["dt"].dt.timestamp().to_numpy()
new = df_ref["dt"].dt.timestamp().to_numpy()
hi = np.searchsorted(old, new).clip(1, len(old) - 1)
lo = hi - 1
def _interp(column):
column = column.to_numpy()
slope = (column[hi] - column[lo]) / (old[hi] - old[lo])
return pl.Series(slope * (new - old[lo]) + column[lo])
values = (
pl.concat([df, df_ref], how="diagonal")
.select(pl.exclude("dt").map(_interp))
)
values.columns = [f"{name}_ref_ip" for name in values.columns]
return df_ref.hstack(values)
>>> %time polars_ip(df_ref, df)
CPU times: user 48.1 ms, sys: 20.4 ms, total: 68.5 ms
Wall time: 22 ms
shape: (85536, 11)
>>> %time scipy_ip(df_ref, df)
CPU times: user 75.5 ms, sys: 5.51 ms, total: 81 ms
Wall time: 74.3 ms
shape: (85536, 11)
检查它们返回相同的值:
>>> polars_ip(df_ref, df).frame_equal(scipy_ip(df_ref, df))
True
您还可以使用以下方法生成相同的值:
N_COLS = 10
names = list(map(str, range(N_COLS)))
has_reading = pl.col(names[0]).is_not_null()
has_no_reading = has_reading.is_not()
(
pl.concat([df, df_ref], how="diagonal")
.sort("dt")
.with_columns([
pl.when(has_reading).then(pl.all())
.shift(-1).backward_fill().suffix("_hi"),
pl.when(has_reading).then(pl.all())
.shift(+1).forward_fill().suffix("_lo")
])
.with_columns([
pl.when(has_reading).then(pl.col(r"^.+_hi$"))
.forward_fill().backward_fill(),
pl.when(has_reading).then(pl.col(r"^.+_lo$"))
.backward_fill().forward_fill()
])
.filter(has_no_reading)
.with_column(
pl.col(r"^dt.*$").dt.timestamp().suffix("_ts"))
.with_columns([
(((pl.col(f"{name}_hi") - pl.col(f"{name}_lo"))
/ (pl.col("dt_hi_ts") - pl.col("dt_lo_ts")))
* (pl.col("dt_ts") - pl.col("dt_lo_ts"))
+ pl.col(f"{name}_lo"))
.alias(f"{name}_ref_ip") for name in names
])
.select([
pl.col("dt"), pl.col("^.+_ref_ip$")
])
)
第一种方法
我认为这是其中一个问题,您想要创建比实际想要的多得多的行,然后过滤回您想要的行。
因为 Polars 的
interpolate
函数仅计算已知值之间的缺失值,而不是向前和向后推断,所以让我们第一步手动推断 df1 以在前后添加额外的行。
df1=df1.lazy()
df1=pl.concat([df1,
df1.sort('dt').with_row_count('n') \
.select(
[pl.col('n')] + \
[pl.when(pl.col('n')<=1) \
.then(pl.col(x)-(pl.col(x).shift(-1)-pl.col(x))) \
.when(pl.col('n')>=pl.col('n').max()-1) \
.then(pl.col(x)+(pl.col(x)-pl.col(x).shift(1)))
for x in df1.columns]
) \
.filter((pl.col('n')==0) | (pl.col('n')==pl.col('n').max())) \
.select(pl.exclude('n'))]).sort('dt')
我在
select
中使用列表理解,因此这应该可以扩展到任意数量的列。
接下来要做的是创建一个带有 dt 列的 df,该列从最早的 dt 开始,以 df0 和 df1 之间时间差最小的最新 dt 结束。通过修复关键列中的差异,它可以让 Polars 的
interpolate
按您的预期工作。
specs = pl.concat([df0.select('dt'),df1.select('dt')]) \
.sort('dt').select([
pl.col('dt').min().alias('mindt'),
pl.col('dt').max().alias('maxdt'),
(pl.col('dt')-pl.col('dt').shift()).min().alias('mindiff')
]).collect()
newdf = pl.DataFrame({'dt':pl.date_range(specs[0,0], specs[0,1], specs[0,2])}).lazy()
或者,您可以使用列表理解来创建
newdf
,以防 dt 不是日期时间 pl.DataFrame({'dt': [specs[0,0] + specs[0,2]*x for x in range(int(1+(specs[0,1]-specs[0,0])/specs[0,2]))]}).lazy()
这样,您就可以在它和两个 dfs 之间进行外连接,然后使用嵌入的
interpolate
来获取您要查找的所有值。您可以链接一个过滤器并在最后进行选择以匹配您的输出。
newdf = newdf.join(df0, on='dt', how='outer') \
.join(df1, on='dt', how='outer') \
.with_columns([pl.col(x).interpolate().suffix('_interp') for x in df1.columns if x != 'dt']) \
.filter(~pl.col('v0').is_null()).select(pl.exclude('v1')) \
.collect()
第二种方法
解决该问题的另一种方法是使用一堆shift和whenthen语句从本质上重新创建scipy插值函数...
首先进行对角连接,然后添加一堆表示 dt 和 v1 列但已移位的辅助列,一对用于向前移位,另一对用于向后移位。然后通过时间差计算
v1
的变化,然后将其向前和向后结转。几乎最后是开始/结束/中间行的whenthen逻辑。最后,过滤并选择辅助列。
pl.concat([df0, df1], how='diagonal').sort('dt') \
.with_column(pl.when(~pl.col('v1').is_null()).then(pl.col('dt')).alias('v1dt')) \
.with_columns([
pl.col('v1').fill_null(strategy='forward').alias('v1_for'),
pl.col('v1dt').fill_null(strategy='forward').alias('v1dt_for'),
pl.col('v1').fill_null(strategy='backward').alias('v1_back'),
pl.col('v1dt').fill_null(strategy='backward').alias('v1dt_back')
]) \
.with_column(((pl.col('v1_back')-pl.col('v1_for'))/(pl.col('v1dt_back')-pl.col('v1dt_for'))).alias('diff')) \
.with_column((pl.when(pl.col('diff').is_nan()).then(None).otherwise(pl.col('diff'))).alias('diff')) \
.with_column(pl.col('diff').fill_null(strategy='forward').fill_null(strategy='backward')) \
.with_column((pl.when(~pl.col('v1').is_null()).then(pl.col('v1')) \
.when((~pl.col('v1_for').is_null()) & (~pl.col('v1_back').is_null())) \
.then((pl.col('dt')-pl.col('v1dt_for'))*pl.col('diff')+pl.col('v1_for')) \
.when(~pl.col('v1_back').is_null()) \
.then(pl.col('v1_back')-(pl.col('v1dt_back')-pl.col('dt'))*pl.col('diff')) \
.otherwise(pl.col('v1_for')+(pl.col('dt')-pl.col('v1dt_for'))*pl.col('diff'))).alias('v1_interp')) \
.filter(~pl.col('v0').is_null()).select(['dt','v0','v1_interp'])