我有一个带有标签和时间戳的示例数据框,如下所示:
timestamps labels
0 2023-08-01 00:00:00 A
1 2023-08-01 03:00:00 B
2 2023-08-01 06:00:00 C
3 2023-08-01 09:00:00 A
4 2023-08-01 12:00:00 B
5 2023-08-01 15:00:00 C
6 2023-08-01 18:00:00 A
7 2023-08-01 21:00:00 B
我想以数组的形式获取整个数据集的 N 个(此处为 10 个)最常见标签的计数。然后我想在“14D”这样的滚动时间窗口中查看数据帧,并获取该窗口中 N 个最常见标签的计数数组。两个数组中的第 11 个元素都是 1 - 数组的归一化总和(可以忽略这部分)。然后我想计算两个数组之间的欧几里得距离并将其与时间戳进行绘制。
所以这个解决方案是有效的,但由于我有一个非常大的数据集,迭代滚动窗口需要很长时间。另外,我必须忽略所有计算的距离,直到时间戳到达第一个时间窗口。有没有更好、更快的方法来做到这一点?
import numpy as np
import pandas as pd
from scipy.spatial.distance import euclidean
def get_label_array(label_series, top_N=10):
label_counts = label_series.value_counts().head(top_N)
other_count = len(label_series) - label_counts.sum()
label_arr = label_counts.values
label_arr = np.append(label_arr, other_count)
normalized_label_arr = label_arr / np.sum(label_arr)
return normalized_label_arr
def calculate_rolling_dist(df, timestamps, time_window='14D', top_N=10):
datetimestamps = pd.to_datetime(timestamps)
df.set_index(datetimestamps, inplace=True)
top_N_lft = get_label_array(df['labels'], top_N)
rolling_result = []
rolling_ts = []
for window_start in df.index[:-1]:
rolling_ts.append(window_start)
print(window_start)
window_end = window_start + pd.Timedelta(time_window)
window_data = df.loc[window_start:window_end]
if len(window_data) > 0:
lft = get_label_array(window_data['labels'], top_N)
if len(lft) >= 10:
rolling_result.append(euclidean(top_N_lft,lft))
return np.array(rolling_result), np.array(rolling_ts)
pandas 1.5.1
numpy 1.23.4
numba 0.57.1
python 3.11.0
在不询问您到底想做什么的含义的情况下,我会尝试纯粹根据执行技术来回答。
那么,让我们准备一些要使用的数据。我假设你有一系列符号,例如大写拉丁字母,按预定义频率的时间戳进行索引,例如3小时。当我们摆脱索引而只在 Numpy 中工作时,最后一个假设将很重要。至于符号,我假设它们是分类值,可以用固定范围内的数字替换。
import pandas as pd
import numpy as np
from numba import njit
from scipy.spatial.distance import euclidean
from string import ascii_uppercase
from timeit import Timer
from numpy.lib.stride_tricks import as_strided
from numpy.random import default_rng
rng = default_rng(seed=0)
total = 3*8*14*10 # 8x3hours(1day) * 14days * 10times
df = pd.DataFrame({
'timestamp': pd.date_range('2023', periods=total, freq='3H')
, 'label': rng.choice([*ascii_uppercase], size=total)
})
另外,我希望你能原谅我允许自己使用其他名字:
get_label_array
我打电话给nfrequent
calculate_rolling_dist
我打电话给rolling_distance
top_N
我只是打电话 n
我们可以使用 pandas.Series.value_counts 简化返回有限频率表的第一个函数,请注意
normalize=True
参数。另外,我委托 Numpy 和 Pandas 尽可能多的迭代:
def nfrequent(seq, n):
'''get n most frequent items in discending order
with normalized frequency and all the rest at the end'''
vc = seq.value_counts(normalize=True) # ascending=False by default
return pd.concat([
vc.iloc[:n],
pd.Series({'rest': vc.iloc[n:].sum()})
])
def rolling_distance(seq, window='14D', n=10):
total_nfreq = nfrequent(seq, n)
n = len(total_nfreq) - 1 # in case if total_nfreq is short
return seq.map(ord).rolling(window).agg(
lambda x: (
euclidean(total_nfreq, window_nfreq)
if len((window_nfreq:=nfrequent(x, n))) > n
else np.nan
)
)
代码注释:
datetime
,因为通常我们无法预测作为字符串传递的日期的格式。Series
top_N=10
seq.map(ord)
在我的硬件上,该任务需要 3.47 秒才能完成。看起来太多了。外部 for 循环的效率甚至更低(3.97 秒)。所以让我们看看其他选择。
更多 Numpy
seq = df.set_index('timestamp').squeeze()
ans_pd = rolling_distance(seq) # use this to check with other results
print(Timer(lambda: rolling_distance(seq)).autorange())
中的Pandas,并摆脱滚动聚合中的if-else语句。另外,我认为
nfrequent
比 seq.map(ord)
功能更好:rolling_distance
检查性能并保存结果:
def nfrequent(seq, n, count_unique=26): # 26 is len(string.asci_uppercase)
'''get n most frequent items in discending order, n <= count_unique
with normalized frequency and all the rest at the end'''
counts = np.bincount(seq, minlength=count_unique)
counts[::-1].sort() # sort inplace in reversed order
counts[n] = counts[n:].sum()
return counts[:n+1] / counts[:n+1].sum()
def rolling_distance(seq, window='14D', n=10):
total_nfreq = nfrequent(seq, n)
return seq.rolling(window).agg(
lambda window: euclidean(total_nfreq, nfrequent(window, n))
)
这次我这边的表现好多了,358 毫秒,而之前是 3.5 秒。努巴能帮忙吗?时机不对。
seq = df.set_index('timestamp').squeeze().map(ord)
ans_np = rolling_distance(seq)
print(Timer(lambda: rolling_distance(seq)).autorange())
会有问题,并且由于将数据转换为
pandas.Series
很麻烦,性能会下降。让我们找出另一种在没有 Pandas 的情况下创建滚动窗口的方法。Numpy 切片技巧
该数组的第一个索引是窗口号;
numpy.ndarray
现在我们将数据准备为 numpy 数组,并将滑动窗口的参数准备为整数(即,我们将像
def nfrequent(seq, n, count_unique=26): # 26 is len(string.asci_uppercase)
'''get n most frequent items in discending order, n <= count_unique
with normalized frequency and all the rest at the end'''
#counts = np.zeros(count_unique)
counts = np.bincount(seq, minlength=count_unique)
counts[::-1].sort() # sort inplace in reversed order
counts[n] = counts[n:].sum()
return counts[:n+1] / counts[:n+1].sum()
def rolling_distance(arr, window, step=1, n=10):
windows = strided_view(seq, window)
total_nfreq = nfrequent(seq, n)
distance = np.empty(windows.shape[0] ,dtype=float)
for i in range(distance.size):
distance[i] = euclidean(total_nfreq, nfrequent(windows[i], n))
return distance
def strided_view(arr, window, step=1):
n = arr.size
itemsize = arr.itemsize
ksteps = 1 + (n - window) // step
return as_strided(
arr
, shape=(ksteps, window)
, strides=(itemsize*step, itemsize)
, writeable=False
)
这样的字符串转换为多个标签,以像一个窗口一样):
'14D'
这次我看到性能稍微好一些,120 毫秒,而之前的时间为 358 毫秒和 3.5 秒。
Numpy stryde 技巧和 Numba
# map the labels by numbers from 0 to 25
seq = df.set_index('timestamp').squeeze().map(ord) - ord('A')
# save the timestamps in index to restore lately if needed
index, seq = seq.index, seq.to_numpy()
freq = pd.Timedelta('3H')
interval = pd.Timedelta('14D')
window = interval // freq # number of items in a window
ans_strided = rolling_distance(seq, window) # save the data to compare
print(Timer(lambda: rolling_distance(seq, window))).autorange())
这次我的测试数据为 4.76 毫秒,比其他所有数据都好。
数据随时间分布不均匀
@njit
def nfrequent(seq, n, count_unique=26): # 26 is len(string.asci_uppercase)
'''get n most frequent items in discending order, n <= count_unique
with normalized frequency and all the rest at the end'''
#counts = np.zeros(count_unique)
counts = np.bincount(seq, minlength=count_unique)
counts[::-1].sort() # sort inplace in reversed order
counts[n] = counts[n:].sum()
return counts[:n+1] / counts[:n+1].sum()
@njit
def rolling_distance(windows, n=10): # pass strided windows as a parameter
total_nfreq = nfrequent(seq, n)
distance = np.empty(windows.shape[0], dtype=float)
for i in range(distance.size):
delta = total_nfreq - nfrequent(windows[i], n)
distance[i] = np.sqrt(delta @ delta)
return distance
def strided_view(arr, width, step=1):
n = arr.size
itemsize = arr.itemsize
ksteps = 1 + (n - width) // step
return as_strided(
arr
, shape=(ksteps, width)
, strides=(itemsize*step, itemsize)
, writeable=False
)
# prepare data, collect result, check performance
seq = df.set_index('timestamp').squeeze().map(ord) - ord('A')
index, seq = seq.index, seq.to_numpy()
freq = pd.Timedelta('3H')
interval = pd.Timedelta('14D')
width = interval // freq
windows = strided_view(seq, width)
ans_numba_strided = rolling_distance(windows)
print(Timer(lambda: rolling_distance(windows))).autorange())
现在我们可以沿着箱滑动,即数据的第一个索引,以收集给定时间段内字母的分布。