我重新发布这个问题是因为我仍在寻找更快的实现(现在使用极坐标)。我使用 Polars 代替 pandas 将速度提高了大约 10 倍。但是,我需要一种更有效的方法来运行此功能。 问题是我有两个嵌套的 for 循环,第二个 for 循环有一个条件(以另一个 df 为条件)。
设置是相同的。我有一个数据集,其中包含多个ID、值和日期(大T,大N)。此外,还有其他数据集,SB_dates 具有随机日期(以块为单位)和 newBL,一个与 SB_dates 大小相同的布尔数组矩阵。
对于每个新块,我需要选择 5 (n) 个随机 ID 并计算该日期的平均值。如果它不是一个新块,我应该保留之前选择的相同随机 ID。
使用极坐标,我已经编码了这样的函数,其中 data 和 SB_dates 作为 pl.DataFrames。
def get_mean_chrono_polars(data, SB_dates, newBL, n=5):
n_rows, n_columns = SB_dates.shape
df_sb = pl.DataFrame()
for col in range(n_columns):
date_column = pl.DataFrame( SB_dates[:, col])
newBL_column = newBL[:, col]
mean_values_col = []
for i in range(n_rows):
filter_ids=(data
.select(pl.col("date",'values','ids'))
.filter(pl.col("date") == date_column[i,:]))
if newBL_column[i]:
random_ids=filter_ids.select(pl.col("ids").shuffle(seed=1)).limit(n)
selected_ids_df = (
filter_ids
.select(pl.col("date", 'values', 'ids'))
.filter(pl.col('ids').is_in(random_ids['ids']))
)
mean_values = selected_ids_df['values'].mean()
mean_values_col.append(mean_values)
mean_values_col=pl.Series(str(col),mean_values_col)
df_sb=df_sb.hstack([mean_values_col])
return df_sb
r=get_mean_chrono_polars(data, SB_dates, newBL, n=5)
数据
import polars as pl
import random
from datetime import datetime, timedelta
import numpy as np
import pandas as pd
n_rows = 1000 # You can adjust this as needed
start_date = datetime(2000, 1, 1)
end_date = datetime(2023, 12, 31)
dates = [
start_date + timedelta(days=random.randint(0, (end_date - start_date).days))
for _ in range(n_rows)
]
unique_ids = [random.randint(0, 100) for _ in range(n_rows)]
returns = [random.uniform(-0.01, 0.01) for _ in range(n_rows)]
data = pl.DataFrame({'date': dates, 'values': returns, 'ids': unique_ids})
data=data.with_columns(date=pl.col("date").dt.month_end())
def SB(data, B, BL, H):
# Define the probability of a new block
p = 1 / BL
# Initialize the indices matrix
indices = np.zeros((H, B), dtype=int)
# Generate random indices for the first row
indices[0, :] = np.ceil(H * np.random.rand(1, B))
# Set up the random numbers
#return mask as newBL
mask = np.random.rand(H, B) < p
nBL = (mask).cumsum(axis=0)
newBL=mask
newBL[0, :] = True
# Loop through time steps to fill in the indices
for i in range(1, H):
# Determine whether we stay or move to a new starting value
indices[i, mask[i, :]] = np.ceil(H * np.random.rand(1, np.sum(mask[i, :])))
indices[i, ~mask[i, :]] = indices[i - 1, ~mask[i, :]] + 1
# Wrap around if the indices exceed the length of the data
indices[i, indices[i, :] > H] -= H
newBL[i, indices[i, :] > H] = True
# The indices make finding the bsdata simple
bsdata = data[indices.flatten()]
# Reshape bsdata to match the shape of indices
bsdata = np.reshape(bsdata, indices.shape)
# Convert bsdata to a DataFrame
bsdata = pd.DataFrame(bsdata)
newBL[bsdata == "2000-01-31 00:00:00"] = True
return bsdata, indices, nBL, newBL
BL=30 # Block-length
unique_dates=data['date'].unique()
SB_dates, index, nbL, newBL = SB(unique_dates, B=100, BL=BL, H=180)
SB_dates=pl.DataFrame(SB_dates)
再次抱歉,帖子太长了。 以下是原始帖子的链接:通过删除 col 和 row for 循环(带条件的循环)来加速
我想我能够将其减少到一个循环,也许也能够将其减少到零。这更像是一个通用的方法答案,希望它有用:
SB_dates
和 newBL
合并到一个 DataFrame 中可能是最好的选择,这样我们就可以在一个地方完成所有计算。如果你真的想要的话,还有 LazyFrame 的 with_context
。data
的左连接(维护左 DataFrame 的顺序)可以完成所有 filter_ids = ...
步骤。group_by
进一步准备好“在当天选择 n
随机 ID”步骤。maintain_order=True
,可以在pl.when
区域中使用agg
来消除上述步骤中任何不需要的计算。list.to_struct
解除列表的嵌套,然后需要 unnest
来避免空值列表/空值结构 - 如果 null
是 newBL
,则每个新列只需一个 False
。strategy
上使用 forward
的 fill_nulls
。这将为您提供给定列的
mean_val
,该列可以水平concat
启用(剩下的一个循环)。
import polars as pl
import random
from datetime import datetime, timedelta
n_rows = 100000
start_date = date(2000, 1, 1)
end_date = date(2023, 12, 31)
random.seed(1)
# Generate data
dates = [
start_date + timedelta(days=random.randint(0, (end_date - start_date).days))
for _ in range(n_rows)
]
sb_dates = [
[
start_date + timedelta(days=random.randint(0, (end_date - start_date).days))
for _ in range(20)
]
for _ in range(6)
]
returns = [random.uniform(-0.01, 0.01) for _ in range(n_rows)]
ids = random.sample(range(n_rows * 1000), n_rows)
# Create a Polars DataFrame
data = pl.DataFrame({'date': dates, 'values': returns, 'ids': ids})
sb_data = pl.DataFrame({f'bd{i}': sb_dates[i] for i in range(6)}).with_columns(
(pl.all().dt.month_end())
)
blnew_data = pl.DataFrame(
{
f'bn{i}': [random.random() < 0.5 if j else True for j in range(20)]
for i in range(6)
}
)
def get_mean_chrono_polars(data, SB_dates, newBL, n=5):
static_data = pl.concat((SB_dates, newBL), how='horizontal')
return pl.concat(
(
static_data.select(cs.ends_with(f'{i}'))
.join(data, left_on=f'bd{i}', right_on='date', how='left')
.group_by(f'bd{i}', f'bn{i}', maintain_order=True)
.agg(pl.when(pl.col(f'bn{i}')).then(pl.col('ids', 'values').sample(n)))
.with_columns(
pl.col('ids').list.to_struct(
n_field_strategy='max_width',
fields=[f'rid{j}' for j in range(n)],
upper_bound=n,
),
pl.col('values').list.to_struct(
n_field_strategy='max_width',
fields=[f'val{j}' for j in range(n)],
upper_bound=n,
),
)
.unnest('ids', 'values')
.select(
(pl.sum_horizontal(cs.starts_with('val')) / n)
.fill_null(strategy='forward')
.alias(f'{i}')
)
for i in range(n+1)
),
how='horizontal',
)
get_mean_chrono_polars(data, sb_data, blnew_data)