计算Python数据框中标记数据的滚动时间窗口的欧几里德距离

问题描述 投票:0回答:1

我有一个带有标签和时间戳的示例数据框,如下所示:

            timestamps labels
0  2023-08-01 00:00:00     A
1  2023-08-01 03:00:00     B
2  2023-08-01 06:00:00     C
3  2023-08-01 09:00:00     A
4  2023-08-01 12:00:00     B
5  2023-08-01 15:00:00     C
6  2023-08-01 18:00:00     A
7  2023-08-01 21:00:00     B

我想以数组的形式获取整个数据集的 N 个(此处为 10 个)最常见标签的计数。然后我想在“14D”这样的滚动时间窗口中查看数据帧,并获取该窗口中 N 个最常见标签的计数数组。两个数组中的第 11 个元素都是 1 - 数组的归一化总和(可以忽略这部分)。然后我想计算两个数组之间的欧几里得距离并将其与时间戳进行绘制。

所以这个解决方案是有效的,但由于我有一个非常大的数据集,迭代滚动窗口需要很长时间。另外,我必须忽略所有计算的距离,直到时间戳到达第一个时间窗口。有没有更好、更快的方法来做到这一点?

import numpy as np
import pandas as pd
from scipy.spatial.distance import euclidean

def get_label_array(label_series, top_N=10):
    label_counts = label_series.value_counts().head(top_N)
    other_count = len(label_series) - label_counts.sum()
    
    label_arr = label_counts.values
    label_arr = np.append(label_arr, other_count)
    normalized_label_arr = label_arr / np.sum(label_arr)
    return normalized_label_arr

def calculate_rolling_dist(df, timestamps, time_window='14D', top_N=10):
    datetimestamps = pd.to_datetime(timestamps)
    df.set_index(datetimestamps, inplace=True)
    top_N_lft = get_label_array(df['labels'], top_N)
    rolling_result = []
    rolling_ts = []
    for window_start in df.index[:-1]:
        rolling_ts.append(window_start)   
        print(window_start)   
        window_end = window_start + pd.Timedelta(time_window)
        window_data = df.loc[window_start:window_end]
        if len(window_data) > 0:
            lft = get_label_array(window_data['labels'], top_N)
            if len(lft) >= 10:
                rolling_result.append(euclidean(top_N_lft,lft))
    return np.array(rolling_result), np.array(rolling_ts)


python pandas dataframe rolling-computation euclidean-distance
1个回答
0
投票

pandas 1.5.1
numpy 1.23.4
numba 0.57.1
python 3.11.0


滚动窗口聚合

在不询问您到底想做什么的含义的情况下,我会尝试纯粹根据执行技术来回答。

那么,让我们准备一些要使用的数据。我假设你有一系列符号,例如大写拉丁字母,按预定义频率的时间戳进行索引,例如3小时。当我们摆脱索引而只在 Numpy 中工作时,最后一个假设将很重要。至于符号,我假设它们是分类值,可以用固定范围内的数字替换。

import pandas as pd
import numpy as np
from numba import njit

from scipy.spatial.distance import euclidean
from string import ascii_uppercase
from timeit import Timer
from numpy.lib.stride_tricks import as_strided
from numpy.random import default_rng
rng = default_rng(seed=0)


total = 3*8*14*10    # 8x3hours(1day) * 14days * 10times 
df = pd.DataFrame({
    'timestamp': pd.date_range('2023', periods=total, freq='3H')
    , 'label': rng.choice([*ascii_uppercase], size=total)
})

另外,我希望你能原谅我允许自己使用其他名字:

  • get_label_array
    我打电话给
    nfrequent
  • calculate_rolling_dist
    我打电话给
    rolling_distance
  • top_N
    我只是打电话
    n

熊猫的滚动窗户

我们可以使用 pandas.Series.value_counts 简化返回有限频率表的第一个函数,请注意

normalize=True
参数。另外,我委托 Numpy 和 Pandas 尽可能多的迭代:

def nfrequent(seq, n):
    '''get n most frequent items in discending order
    with normalized frequency and all the rest at the end'''
    vc = seq.value_counts(normalize=True)   # ascending=False by default
    return pd.concat([
        vc.iloc[:n],
        pd.Series({'rest': vc.iloc[n:].sum()})
    ])

def rolling_distance(seq, window='14D', n=10):
    total_nfreq = nfrequent(seq, n)
    n = len(total_nfreq) - 1    # in case if total_nfreq is short
    return seq.map(ord).rolling(window).agg(
        lambda x: (
            euclidean(total_nfreq, window_nfreq)
            if len((window_nfreq:=nfrequent(x, n))) > n
            else np.nan
         ) 
    )

代码注释:

  • 函数应该假设所有日期都已转换为
    datetime
    ,因为通常我们无法预测作为字符串传递的日期的格式。
  • 我们还可以假设传递给第二个函数的数据是一组以时间戳作为索引的符号。
    我认为参数的默认值应该在外部函数中声明一次以避免混淆。你在两者中都定义了
  • Series
  • ,这是未来令人头痛的根源。
    我使用 
  • top_N=10
  • 使数据聚合与自定义函数在滚动窗口上工作。否则我们需要在外循环中迭代窗口。
    
    
  • 让我们记下测试数据上的速度:

seq.map(ord)

在我的硬件上,该任务需要 3.47 秒才能完成。看起来太多了。外部 for 循环的效率甚至更低(3.97 秒)。所以让我们看看其他选择。

更多 Numpy

让我们尝试最小化

seq = df.set_index('timestamp').squeeze() ans_pd = rolling_distance(seq) # use this to check with other results print(Timer(lambda: rolling_distance(seq)).autorange())

中的Pandas,并摆脱滚动聚合中的if-else语句。另外,我认为

nfrequent
seq.map(ord)
功能更好:
rolling_distance

检查性能并保存结果:

def nfrequent(seq, n, count_unique=26): # 26 is len(string.asci_uppercase) '''get n most frequent items in discending order, n <= count_unique with normalized frequency and all the rest at the end''' counts = np.bincount(seq, minlength=count_unique) counts[::-1].sort() # sort inplace in reversed order counts[n] = counts[n:].sum() return counts[:n+1] / counts[:n+1].sum() def rolling_distance(seq, window='14D', n=10): total_nfreq = nfrequent(seq, n) return seq.rolling(window).agg( lambda window: euclidean(total_nfreq, nfrequent(window, n)) )

这次我这边的表现好多了,358 毫秒,而之前是 3.5 秒。努巴能帮忙吗?时机不对。 
seq = df.set_index('timestamp').squeeze().map(ord) ans_np = rolling_distance(seq) print(Timer(lambda: rolling_distance(seq)).autorange())

会有问题,并且由于将数据转换为

pandas.Series
很麻烦,性能会下降。让我们找出另一种在没有 Pandas 的情况下创建滚动窗口的方法。
Numpy 切片技巧

如果我们将标签序列视为二维数组怎么办?考虑如下:

该数组的第一个索引是窗口号;
  • 第二个是窗口中元素的编号。 我们可以使用
  • numpy.lib.stride_tricks.as_strided
  • 来完成此任务。为此,我们需要具有一定频率的数据,以便给定时间间隔的所有窗口都具有相同数量的标签。作为一个额外的优势,我们现在可以使用步骤来减少要循环的窗口数量:
  • numpy.ndarray
现在我们将数据准备为 numpy 数组,并将滑动窗口的参数准备为整数(即,我们将像 
def nfrequent(seq, n, count_unique=26): # 26 is len(string.asci_uppercase) '''get n most frequent items in discending order, n <= count_unique with normalized frequency and all the rest at the end''' #counts = np.zeros(count_unique) counts = np.bincount(seq, minlength=count_unique) counts[::-1].sort() # sort inplace in reversed order counts[n] = counts[n:].sum() return counts[:n+1] / counts[:n+1].sum() def rolling_distance(arr, window, step=1, n=10): windows = strided_view(seq, window) total_nfreq = nfrequent(seq, n) distance = np.empty(windows.shape[0] ,dtype=float) for i in range(distance.size): distance[i] = euclidean(total_nfreq, nfrequent(windows[i], n)) return distance def strided_view(arr, window, step=1): n = arr.size itemsize = arr.itemsize ksteps = 1 + (n - window) // step return as_strided( arr , shape=(ksteps, window) , strides=(itemsize*step, itemsize) , writeable=False )

这样的字符串转换为多个标签,以像一个窗口一样):

'14D'

这次我看到性能稍微好一些,120 毫秒,而之前的时间为 358 毫秒和 3.5 秒。

Numpy stryde 技巧和 Numba

上面的代码可以修改为使用 Numba 运行:

# map the labels by numbers from 0 to 25 seq = df.set_index('timestamp').squeeze().map(ord) - ord('A') # save the timestamps in index to restore lately if needed index, seq = seq.index, seq.to_numpy() freq = pd.Timedelta('3H') interval = pd.Timedelta('14D') window = interval // freq # number of items in a window ans_strided = rolling_distance(seq, window) # save the data to compare print(Timer(lambda: rolling_distance(seq, window))).autorange())

这次我的测试数据为 4.76 毫秒,比其他所有数据都好。

数据随时间分布不均匀

如果数据随时间分布不均匀,我们可能会尝试将其分类为时间间隔,例如:

@njit def nfrequent(seq, n, count_unique=26): # 26 is len(string.asci_uppercase) '''get n most frequent items in discending order, n <= count_unique with normalized frequency and all the rest at the end''' #counts = np.zeros(count_unique) counts = np.bincount(seq, minlength=count_unique) counts[::-1].sort() # sort inplace in reversed order counts[n] = counts[n:].sum() return counts[:n+1] / counts[:n+1].sum() @njit def rolling_distance(windows, n=10): # pass strided windows as a parameter total_nfreq = nfrequent(seq, n) distance = np.empty(windows.shape[0], dtype=float) for i in range(distance.size): delta = total_nfreq - nfrequent(windows[i], n) distance[i] = np.sqrt(delta @ delta) return distance def strided_view(arr, width, step=1): n = arr.size itemsize = arr.itemsize ksteps = 1 + (n - width) // step return as_strided( arr , shape=(ksteps, width) , strides=(itemsize*step, itemsize) , writeable=False ) # prepare data, collect result, check performance seq = df.set_index('timestamp').squeeze().map(ord) - ord('A') index, seq = seq.index, seq.to_numpy() freq = pd.Timedelta('3H') interval = pd.Timedelta('14D') width = interval // freq windows = strided_view(seq, width) ans_numba_strided = rolling_distance(windows) print(Timer(lambda: rolling_distance(windows))).autorange())

现在我们可以沿着箱滑动,即数据的第一个索引,以收集给定时间段内字母的分布。

© www.soinside.com 2019 - 2024. All rights reserved.