根据潜在的开始和结束布尔列在时间序列数据中创建组(矢量化解决方案)

问题描述 投票:0回答:6

我的数据框结构如下:

   group  maybe_start  maybe_end
0    ABC        False      False
1    ABC         True      False
2    ABC        False      False
3    ABC        False      False
4    ABC         True      False
5    ABC        False      False
6    ABC        False       True
7    ABC        False      False
8    DEF        False      False
9    DEF        False      False
10   DEF         True      False
11   DEF        False      False
12   DEF        False       True
13   DEF        False      False
14   DEF        False      False
15   DEF        False       True
16   DEF         True      False
17   DEF        False      False
18   DEF        False       True

我需要创建一个单独的列,比如说

group2
,它将记录由开始和结束时刻定义的组。因此,每当
group2
列中前一个
maybe_start
之后出现第一个 True 值时,
maybe_end==True
中的每个组都应该开始,并在开始后第一次出现
maybe_end==True
时结束。换句话说,我们在
group2
maybe_start==True
处开始一个新值(在本例中为第 1 行),并且
group2
的下一行将获得相同的值,直到出现
maybe_end==True
(此处为第 1 行) 6).所有这些都需要在 groupby 中完成,其中基于
group
列创建组。因此,预期输出应如下所示:

   group  maybe_start  maybe_end  group2
0    ABC        False      False     NaN
1    ABC         True      False     1.0
2    ABC        False      False     1.0
3    ABC        False      False     1.0
4    ABC         True      False     1.0
5    ABC        False      False     1.0
6    ABC        False       True     1.0
7    ABC        False      False     NaN
0    DEF        False      False     NaN
1    DEF        False      False     NaN
2    DEF         True      False     1.0
3    DEF        False      False     1.0
4    DEF        False       True     1.0
5    DEF        False      False     NaN
6    DEF        False      False     NaN
7    DEF        False       True     NaN
8    DEF         True      False     2.0
9    DEF        False      False     2.0
10   DEF        False       True     2.0 

如何在 Pandas 中以矢量化方式实现这一目标?

python pandas vectorization
6个回答
1
投票

你可以尝试:

def fn(x):
    out, g, state = [], 1, False
    for start, end in zip(x.maybe_start, x.maybe_end):
        if not state and start:
            out.append(g)
            state = True
        elif state and end:
            out.append(g)
            state = False
            g += 1
        elif state:
            out.append(g)
        else:
            out.append(np.nan)

    x['group2'] = out
    return x


out = df.groupby('group', group_keys=False).apply(fn)
print(out)

打印:

   group  maybe_start  maybe_end  group2
0    ABC        False      False     NaN
1    ABC         True      False     1.0
2    ABC        False      False     1.0
3    ABC        False      False     1.0
4    ABC         True      False     1.0
5    ABC        False      False     1.0
6    ABC        False       True     1.0
7    ABC        False      False     NaN
8    DEF        False      False     NaN
9    DEF        False      False     NaN
10   DEF         True      False     1.0
11   DEF        False      False     1.0
12   DEF        False       True     1.0
13   DEF        False      False     NaN
14   DEF        False      False     NaN
15   DEF        False       True     NaN
16   DEF         True      False     2.0
17   DEF        False      False     2.0
18   DEF        False       True     2.0

1
投票
import pandas as pd
import numpy as np

df = pd.DataFrame({
    'group': ['ABC'] * 8 + ['DEF'] * 11,
    'maybe_start': [False, True, False, False, True, False, False, False, False, False, True, False, False, False, False, False, True, False, False],
    'maybe_end': [False, False, False, False, False, False, True, False, False, False, False, False, True, False, False, True, False, False, True]
})

def apply_func(df):
    df['group2'] = np.nan
    counter = 0
    started = False
    for idx, row in df.iterrows():
        if row['maybe_start'] and not started:
            counter += 1
            started = True
        if started:
            df.loc[idx, 'group2'] = counter
        if row['maybe_end']:
            started = False
    return df

df = df.groupby('group').apply(apply_func)

print(df)


1
投票

我正在发布另一个答案,因为 StackOverflow 不允许我编辑旧答案。


如上所述,很难对这段代码进行矢量化,但是您可以尝试使用

numba
来加速计算:

from numba import njit


@njit
def numba_fn(out, x_start, x_end):
    g = 1.0
    state = 0
    for i in range(len(out)):
        start = x_start[i]
        if state == 0 and start:
            out[i] = g
            state = 1
        elif state == 1:
            out[i] = g
            end = x_end[i]
            if end:
                state = 0
                g += 1

def fn(x):
    x["group2"] = np.nan
    numba_fn(x["group2"].values, x["maybe_start"].values, x["maybe_end"].values)
    return x


print(df.groupby("group", group_keys=False).apply(fn))

打印:

   group  maybe_start  maybe_end  group2
0    ABC        False      False     NaN
1    ABC         True      False     1.0
2    ABC        False      False     1.0
3    ABC        False      False     1.0
4    ABC         True      False     1.0
5    ABC        False      False     1.0
6    ABC        False       True     1.0
7    ABC        False      False     NaN
8    DEF        False      False     NaN
9    DEF        False      False     NaN
10   DEF         True      False     1.0
11   DEF        False      False     1.0
12   DEF        False       True     1.0
13   DEF        False      False     NaN
14   DEF        False      False     NaN
15   DEF        False       True     NaN
16   DEF         True      False     2.0
17   DEF        False      False     2.0
18   DEF        False       True     2.0

但是 主要瓶颈似乎是

pd.Groupby
(它为每个组创建新的数据帧,这会减慢计算速度)。

您可以使用

np.bincount
来模拟
.groupby()
(假设数据帧按“组”排序):

d = np.bincount(
    pd.Categorical(df["group"]).codes,
)
d = [0, *d.cumsum()]

df["group2"] = np.nan

values_group2 = df["group2"].to_numpy()
values_start = df["maybe_start"].to_numpy()
values_end = df["maybe_end"].to_numpy()

for a, b in zip(d, d[1:]):
    numba_fn(values_group2[a:b], values_start[a:b], values_end[a:b])

print(df)

基准(数据框有 1000 组,每组有 1000 个元素):

from timeit import timeit
from numba import njit

import numpy as np
import pandas as pd


@njit
def numba_fn(out, x_start, x_end):
    g = 1.0
    state = 0
    for i in range(len(out)):
        start = x_start[i]
        if state == 0 and start:
            out[i] = g
            state = 1
        elif state == 1:
            out[i] = g
            end = x_end[i]
            if end:
                state = 0
                g += 1


def normal_fn(x):
    out, g, state = [], 1, False
    for start, end in zip(x.maybe_start, x.maybe_end):
        if not state and start:
            out.append(g)
            state = True
        elif state and end:
            out.append(g)
            state = False
            g += 1
        elif state:
            out.append(g)
        else:
            out.append(np.nan)

    x["group2"] = out
    return x


def fn(x):
    x["group2"] = np.nan
    numba_fn(x["group2"].values, x["maybe_start"].values, x["maybe_end"].values)
    return x


def test_groupby_normal_fn(df):
    return df.groupby("group", group_keys=False).apply(normal_fn)


def test_groupby_numba_fn(df):
    return df.groupby("group", group_keys=False).apply(fn)


def test_numpy_groupby_numba_fn(df):
    d = np.bincount(
        pd.Categorical(df["group"]).codes,
    )
    d = [0, *d.cumsum()]

    df["group2"] = np.nan

    values_group2 = df["group2"].to_numpy()
    values_start = df["maybe_start"].to_numpy()
    values_end = df["maybe_end"].to_numpy()

    for a, b in zip(d, d[1:]):
        numba_fn(values_group2[a:b], values_start[a:b], values_end[a:b])

    return df


def generate_df(num_groups=1000, elements_in_group=1000):
    from random import randint, seed

    seed(42)

    out = []
    for g in range(num_groups):
        for _ in range(elements_in_group):
            out.append((str(g), bool(randint(0, 1)), bool(randint(0, 1))))

    return pd.DataFrame(out, columns=["group", "maybe_start", "maybe_end"])


df = generate_df()

# test if the algorithm is correct:

df1 = test_groupby_normal_fn(df.copy())
df2 = test_groupby_numba_fn(df.copy())
df3 = test_numpy_groupby_numba_fn(df.copy())

np.testing.assert_equal(df1["group2"].values, df2["group2"].values)
np.testing.assert_equal(df1["group2"].values, df3["group2"].values)

t1 = timeit(
    "test_groupby_normal_fn(x)", setup="x=df.copy()", number=1, globals=globals()
)
t2 = timeit(
    "test_groupby_numba_fn(x)", setup="x=df.copy()", number=1, globals=globals()
)
t3 = timeit(
    "test_numpy_groupby_numba_fn(x)", setup="x=df.copy()", number=1, globals=globals()
)

print("test_groupby_normal_fn =", t1)
print("test_groupby_numba_fn =", t2)
print("test_numpy_groupby_numba_fn =", t3)

在我的机器上打印(AMD 5700x,python==3.11.4,pandas=2.0.3,numpy==1.24.4,numba==0.57.1):

test_groupby_normal_fn = 0.47242454695515335
test_groupby_numba_fn = 0.2934450509492308
test_numpy_groupby_numba_fn = 0.03471649601124227

numpy groupby + numba JIT 比正常速度快约 14 倍

pd.Groupby
+
.apply


1
投票

在我看来,这种情况下的过程很难完全矢量化,因为计算值不仅取决于记录本身,还取决于不同数量的邻居。不过,我们可以做的是尽量减少总比较次数,这在序列非常长且稀疏的情况下会有所帮助。

首先,让我们删除

group
并仅使用
maybe_start, maybe_end
列:

import pandas as pd

data = {
    'maybe_start': [0,0,1,0,1,0,0,0,0,0,0,0,1,0,0,0,1,0,0],
    'maybe_end'  : [1,0,0,0,0,0,0,1,0,0,1,0,0,0,0,0,0,0,1]
}
df = pd.DataFrame(data, dtype=bool)

现在,主要思想是将索引分割为可以分配给常量值的区间,类似于:

for mark, (start, stop) in enumerate(intervals, start=1):
    marks[start:stop] = mark

所以让我们获取这些间隔,假设

df.index
的类型为
RangeIndex

# extract all maybe_start points including the very last index
# which is needed to build a correct sequence of intervals
# between maybe_start points
start = df.index[df['maybe_start'] | pd.Series(True, df.index[-1:])]

# extract all maybe_end points
stop = df.index[df['maybe_end']]

接下来,我们按

stop
之间的间隔对
start
点进行分组:

intervals = (
    stop
    .to_series()
    .groupby(pd.cut(stop, start))
    # get maybe_end which is closest to the left end of the current interval
    # I guess we could use here .first() as well becaouse the index is sorted
    .min()
    # expand endpoints to the previous empty ranges
    .backfill()
)

# keep only left ends of intervals in index
# Note: all categories are used as index
#    and the index is sorted at this point,
#    that's why the following step is possible
intervals.index = intervals.index.categories.left

# get DataFrame({'first':..., 'last':...})
# where each record contains indexes of closed intervals
# which is going to be marked by the record's index
intervals = (
    intervals
    .drop_duplicates()   # keep the first by default (!)
    # values in this series are the right ends of intervals to mark
    .rename('last')      
    # indexes in the series are the left ends of intervals to mark 
    .rename_axis(index='first')
    .reset_index()
)

# start indexing of the final intervals from 1
intervals.index += 1

这是我们此时在测试数据上得到的结果:

这样,我们就可以标记数据了:

df['marks'] = float('nan')

for mark, (first, last) in intervals.iterrows():
    df.loc[first:last, 'marks'] = mark

这是我们到目前为止所得到的:

现在让我们考虑原始数据的

group
列:

from io import StrigIO

data = '''index   group  maybe_start  maybe_end
0    ABC        False      False
1    ABC         True      False
2    ABC        False      False
3    ABC        False      False
4    ABC         True      False
5    ABC        False      False
6    ABC        False       True
7    ABC        False      False
8    DEF        False      False
9    DEF        False      False
10   DEF         True      False
11   DEF        False      False
12   DEF        False       True
13   DEF        False      False
14   DEF        False      False
15   DEF        False       True
16   DEF         True      False
17   DEF        False      False
18   DEF        False       True
'''
original_df = pd.read_csv(StringIO(data), delim_whitespace=True, index_col=0)
# let's make group values not sorted in test_df
test_df = pd.concat([original_df, original_df]).reset_index(drop=True)

def markup(df: pd.DataFrame) -> pd.Series:
    start = df.index[df['maybe_start'] | pd.Series(True, df.index[-1:])]
    stop = df.index[df['maybe_end']]
    intervals = (
        stop
        .to_series()
        .groupby(pd.cut(stop, start))
        .min()
        .backfill()
    )
    intervals.index = intervals.index.categories.left
    intervals = (
        intervals
        .drop_duplicates()
        .rename('last')      
        .rename_axis(index='first')
        .reset_index()
    )
    intervals.index += 1
    df['marks'] = float('nan')
    for mark, (first, last) in intervals.iterrows():
        df.loc[first:last, 'marks'] = mark
    return df

marked_test = (
    test_df
    .groupby('group', group_keys=False)
    .apply(markup)
)

这是最终输出:


python         : 3.11.0
pandas         : 1.5.1

0
投票

我将提供一个 pandas 解决方案,如果我努力的话,可能可以优化并加快速度。

def test_pandas_fn(df):
    ng = (
        ~(df.groupby(df["group"])["maybe_start"].cummax() & df["maybe_end"].shift())
        & df.groupby("group")["maybe_start"].cummax()
    )
    df["ng"] = ng

    ngg = (~ng).cumsum()
    df["ngg"] = ngg

    df["g2"] = df.groupby(ngg)["maybe_start"].cummax()
    df["group2"] = (
        df[df["g2"]]
        .groupby("group")["ngg"]
        .apply(lambda x: x.astype("category").cat.codes + 1)
        .reset_index(level=0, drop=True)
    )
    return df

使用@Andrej Kesely 设置和测试,

test_groupby_normal_fn = 1.0834068000003754
test_groupby_numba_fn = 0.6451670000005834
test_numpy_groupby_numba_fn = 0.09050569999999425
test_pandas_fn = 1.2246184999985417

但是让我们看看前 20 条记录

   group  maybe_start  maybe_end     ng  ngg     g2  group2
0      0        False      False  False    1  False     NaN
1      0         True      False   True    1   True     1.0
2      0        False      False   True    1   True     1.0
3      0        False      False   True    1   True     1.0
4      0         True      False   True    1   True     1.0
5      0        False      False   True    1   True     1.0
6      0        False      False   True    1   True     1.0
7      0        False      False   True    1   True     1.0
8      0         True      False   True    1   True     1.0
9      0         True       True   True    1   True     1.0
10     0        False      False  False    2  False     NaN
11     0         True       True   True    2   True     2.0
12     0         True      False  False    3   True     3.0
13     0        False       True   True    3   True     3.0
14     0        False      False  False    4  False     NaN
15     0         True      False   True    4   True     4.0
16     0         True       True   True    4   True     4.0
17     0         True      False  False    5   True     5.0
18     0         True      False   True    5   True     5.0
19     0         True      False   True    5   True     5.0

与 Andrey Kesely 结果相反:

   group  maybe_start  maybe_end  group2
0      0        False      False     NaN
1      0         True      False     1.0
2      0        False      False     1.0
3      0        False      False     1.0
4      0         True      False     1.0
5      0        False      False     1.0
6      0        False      False     1.0
7      0        False      False     1.0
8      0         True      False     1.0
9      0         True       True     1.0
10     0        False      False     NaN
11     0         True       True     2.0
12     0         True      False     2.0 <- I think should be a new group
13     0        False       True     2.0
14     0        False      False     NaN
15     0         True      False     3.0
16     0         True       True     3.0
17     0         True      False     4.0
18     0         True      False     4.0
19     0         True      False     4.0

0
投票

2 个步骤:验证是否存在,然后分配组号。

存在

  • 函数next_true()允许在实际使用的'maybe_start''maybe_end'之间来回跳转,在此过程中标记实际开始存在时间
  • 这里既没有分组也没有完全矢量化,因为对历史的依赖阻碍了我的尝试。至少,while 循环不会迭代整个索引。
# 1) Tools for readability

def next_true(df, col):
    '''
    Index of earliest next True in given column
    - df is a dataframe,
    - col is a column name
    '''
    return min(df.loc[df[col]].index)

def cumsumreset(signal_sum, signal_reset):
    '''
    Cumulative sum of signal_sum that resets whenever signal_reset hits False.
    signal_sum, signal_reset are dataframe columns
    '''
    return signal_sum.cumsum() - signal_sum.cumsum().where(~signal_reset).ffill().fillna(0).astype(int)

# 2) Identify actual starts and presence

df[['actual_start','present']] = False
j=0
while j < len(df)-1:
    i = next_true(df.iloc[j:],'maybe_start')
    j = next_true(df.iloc[i:],'maybe_end')
    df.loc[i  ,'actual_start'] = True
    df.loc[i:j,'present']      = True

分组

# 3) Flag group change (ABC --> DEF --> ...) df['turnover'] = df['group']!=df['group'].shift(-1)#True whenever the group is *about to* change. df.loc[len(df)-1,'turnover'] = False# fix last row being True because of .shift () producing NaN. # 4) Assign group number (based on cumulative sum resetting upon group change) df['group2'] = cumsumreset(df['actual_start'], ~df['turnover']) # 5) Finally remove values that need deleting (no presence) df.loc[(~df['present']),'group2']=None
在下面呈现的完整输出中,“group2”与 OP 中的“预期”相同。

group maybe_start maybe_end expected actual_start present turnover group2 0 ABC False False NaN False False False NaN 1 ABC True False 1.0 True True False 1.0 2 ABC False False 1.0 False True False 1.0 3 ABC False False 1.0 False True False 1.0 4 ABC True False 1.0 False True False 1.0 5 ABC False False 1.0 False True False 1.0 6 ABC False True 1.0 False True False 1.0 7 ABC False False NaN False False True NaN 8 DEF False False NaN False False False NaN 9 DEF False False NaN False False False NaN 10 DEF True False 1.0 True True False 1.0 11 DEF False False 1.0 False True False 1.0 12 DEF False True 1.0 False True False 1.0 13 DEF False False NaN False False False NaN 14 DEF False False NaN False False False NaN 15 DEF False True NaN False False False NaN 16 DEF True False 2.0 True True False 2.0 17 DEF False False 2.0 False True False 2.0 18 DEF False True 2.0 False True False 2.0
    
© www.soinside.com 2019 - 2024. All rights reserved.