我有以下数据:
data = [0.1, 0.2, 0.3, 0.4 , 0.5, 0.6, 0.7, 0.8, 0.5, 0.2, 0.1, -0.1,
-0.2, -0.3, -0.4, -0.5, -0.6, -0.7, -0.9, -1.2, -0.1, -0.7]
每次数据点变化超过步长时,我都希望记录下来。如果不想,我想保留旧的,直到累积变化至少等于步长。我可以像这样迭代地实现:
import pandas as pd
from copy import deepcopy
import numpy as np
step = 0.5
df_steps = pd.Series(data)
df = df_steps.copy()
today = None
yesterday = None
for index, value in df_steps.iteritems():
today = deepcopy(index)
if today is not None and yesterday is not None:
if abs(df.loc[today] - df_steps.loc[yesterday]) > step:
df_steps.loc[today] = df.loc[today]
else:
df_steps.loc[today] = df_steps.loc[yesterday]
yesterday = deepcopy(today)
我的最终结果是:
[0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.7, 0.7, 0.7, 0.7, 0.1, 0.1, 0.1, 0.1, 0.1, -0.5, -0.5, -0.5, -0.5, -1.2, -0.1, -0.7]
问题和问题
问题是这是迭代实现的(我同意第二个答案here)。我的问题是如何以矢量化方式实现相同目标?
尝试
我的尝试如下,但与结果不符:
(df.diff().cumsum().replace(np.nan, 0) / step).astype(int)
由于纯矢量化方法似乎并不简单,因此我们可以使用numba
将代码编译到C-level,因此具有一种循环但非常高效的方法。这是使用numba的nopython
模式的一种方法:
from numba import njit, float64
@njit('float64[:](float64[:], float32)')
def set_at_cum_change(a, step):
out = np.zeros(len(a), dtype=float64)
prev = a[0]
out[0] = a[0]
for i in range(1,len(a)):
current = a[i]
if np.abs(current-prev) > step:
out[i] = current
prev = current
else:
out[i] = out[i-1]
return out
在同一阵列上进行的测试给出:
data = np.array([0.1, 0.2, 0.3, 0.4 , 0.5, 0.6, 0.7, 0.8, 0.5, 0.2, 0.1, -0.1,
-0.2, -0.3, -0.4, -0.5, -0.6, -0.7, -0.9, -1.2, -0.1, -0.7])
out = set_at_cum_change(data,step= 0.5)
print(out)
array([ 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.7, 0.7, 0.7, 0.7, 0.1,
0.1, 0.1, 0.1, 0.1, -0.5, -0.5, -0.5, -0.5, -1.2, -0.1, -0.7])
如果我们检查时间,则会在110000x
长度数组上使用numba
方法看到huge 22000
加速。这不仅表明在这些情况下numba
是一个很好的方法,而且还清楚了使用panda's iterrows/iteritems is almost always a bad idea:
def op(data):
step = 0.5
df_steps = pd.Series(data)
df = df_steps.copy()
today = None
yesterday = None
for index, value in df_steps.iteritems():
today = deepcopy(index)
if today is not None and yesterday is not None:
if abs(df.loc[today] - df_steps.loc[yesterday]) > step:
df_steps.loc[today] = df.loc[today]
else:
df_steps.loc[today] = df_steps.loc[yesterday]
yesterday = deepcopy(today)
return df_steps.to_numpy()
def fn(step):
current = float('inf')
i = yield
while True:
if abs(current - i) > step:
current = i
i = yield i
else:
i = yield current
def andrej(data):
df = pd.DataFrame({'data': data})
f = fn(0.5)
next(f)
df['new_data'] = df['data'].apply(lambda x: f.send(x))
data_large = np.tile(data, 1_000)
print(data_large.shape)
# (22000,)
np.allclose(op(data_large), set_at_cum_change(data_large, step=0.5))
# True
%timeit op(data_large)
# 5.78 s ± 329 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit andrej(data_large)
# 13.6 ms ± 1.53 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
%timeit set_at_cum_change(data_large, step=0.5)
# 50.4 µs ± 1.8 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
这不是向量化,但是此解决方案避免了deepcopy()
和各种.loc
方法,因此应该更快:
data = [0.1, 0.2, 0.3, 0.4 , 0.5, 0.6, 0.7, 0.8, 0.5, 0.2, 0.1, -0.1, -0.2, -0.3, -0.4, -0.5, -0.6, -0.7, -0.9, -1.2, -0.1, -0.7]
def fn(step):
current = float('inf')
i = yield
while True:
if abs(current - i) > step:
current = i
i = yield i
else:
i = yield current
df = pd.DataFrame({'data': data})
f = fn(0.5)
next(f)
df['new_data'] = df['data'].apply(lambda x: f.send(x))
print(df)
打印:
data new_data
0 0.1 0.1
1 0.2 0.1
2 0.3 0.1
3 0.4 0.1
4 0.5 0.1
5 0.6 0.1
6 0.7 0.7
7 0.8 0.7
8 0.5 0.7
9 0.2 0.7
10 0.1 0.1
11 -0.1 0.1
12 -0.2 0.1
13 -0.3 0.1
14 -0.4 0.1
15 -0.5 -0.5
16 -0.6 -0.5
17 -0.7 -0.5
18 -0.9 -0.5
19 -1.2 -1.2
20 -0.1 -0.1
21 -0.7 -0.7