对大数据(~150MM+行)进行分组和分析的最快方法是什么?

问题描述 投票:0回答:1

我有大量历史电价数据集(151mm+)。有 18,065 个单独的节点进行价格结算,每个节点每小时进行一次观测(8760 个/年)。

数据模式:节点 ID (int64)、日期时间 (datetime) 和价格 (float64)。节点 ID 是 int64、7 位长的密钥。日期时间是日期时间。价格为浮动64。

对于每个节点,我需要年平均波动率如下:

  • 全天平均(最高 4 小时)- 平均(最低 4 小时)
  • 每年每个节点获得一个数据点的所有每日传播的平均值

我尝试过pd.groupby和nsmallest/nlargest的各种方法。我尝试对整个数据帧进行分组,然后对数据进行分块,并使用每个唯一的节点 ID 进行子集化。我能做的最好的事情是使用下面的代码,平均每秒大约 2 个节点 ID,总共大约 2.5 小时。

dd = pd.read_csv(".\ERCOT LMP 2023.csv")
n = 2
dd['Local Datetime (Hour Ending)'] = pd.to_datetime(dd['Local Datetime (Hour Ending)'], format = "%m/%d/%Y %H:%M")
node_ix = dd['Price Node ID'].unique()
dd.drop('Price Node Name', axis = 1, inplace = True)
def tb4(df, n):
    grp = df.groupby(pd.Grouper(key = 'Local Datetime (Hour Ending)', freq = 'D'))
    high = grp['Price $/MWh'].nlargest(n).reset_index().drop('level_1', axis = 1).groupby(pd.Grouper(key = 'Local Datetime (Hour Ending)', freq = 'D'))['Price $/MWh'].mean()
    low = grp['Price $/MWh'].nsmallest(n).reset_index().drop('level_1', axis = 1).groupby(pd.Grouper(key = 'Local Datetime (Hour Ending)', freq = 'D'))['Price $/MWh'].mean()
    return high - low    

results = []
time_start = dt.datetime.now()
time_log = []

for i in range(len(node_ix)):
    loop_start = dt.datetime.now()
    tmp = dd[dd['Price Node ID'] == node_ix[i]].copy()
    tmp.drop('Price Node ID', axis = 1, inplace = True)
    results += [(node_ix[i], tb4(tmp, n).mean())]
    loop_end = dt.datetime.now()
    time_log += [loop_end-loop_start]
    avg_time = sum(time_log,dt.timedelta()) / len(time_log)
    print(f"{dt.datetime.now().strftime(format = '%H:%M:%S')} -- {i+1}/{len(node_ix)} -- Estimated finish: {(avg_time * (len(node_ix) - i)  + time_start).strftime(format = '%H:%M:%S')}")

MRE 的最佳尝试如下所示,目标是获得大量个人 ID 的

diff
,假设它们都拥有一年的每小时数据。

dates = pd.Series(pd.date_range('2020-01-01', '2020-12-31', freq = 'H'))
df1 = pd.DataFrame({'datetime' : dates,
                  'id': 1,
                  'price': np.random.uniform(-20.00, 200.00, size = (len(dates)))})

df2 = pd.DataFrame({'datetime' : dates,
                  'id': 2,
                  'price': np.random.uniform(-20.00, 200.00, size = (len(dates)))})
df = pd.concat([df1, df2], axis = 0)

top = df.groupby(['id', pd.Grouper(key = 'datetime', freq = 'D')])['price'].nlargest(2).reset_index().drop('level_2', axis = 1).groupby(['id', pd.Grouper(key = 'datetime', freq = 'D')])['price'].mean()
bottom = df.groupby(['id', pd.Grouper(key = 'datetime', freq = 'D')])['price'].nsmallest(2).reset_index().drop('level_2', axis = 1).groupby(['id', pd.Grouper(key = 'datetime', freq = 'D')])['price'].mean()
diff = (top - bottom).reset_index().groupby(['id', pd.Grouper(key = 'datetime', freq = 'Y')]).mean()

这是一个数据样本:

Data Sample

我正在迭代唯一的节点列表。

python optimization group-by large-data
1个回答
0
投票

因此,这可能会在消除一些不必要的函数调用并尝试简化数据访问方面提供一些加速:

import pandas as pd
import datetime as dt

def tb4(df: pd.DataFrame, n: int) -> pd.Series: # added types just to make it more readable of what you are dealing with
    grp = df.groupby(pd.Grouper(key = 'Local Datetime (Hour Ending)', freq = 'D'))['Price $/MWh'] # added selector at end because both high + low used so store value once and reuse
    high = grp.nlargest(n).reset_index()['Price $/MWh'].mean() # removed .drop('level_1', axis = 1).groupby(pd.Grouper(key = 'Local Datetime (Hour Ending)', freq = 'D')) because all it seems to do is drop a part of the table (which doesn't matter as you are only looking at a portion of it anyways and not reusing it elsewhere), and then having another groupby which does the same thing as the first grouping having no real impact and just extra run time
    low = grp.nsmallest(n).reset_index()['Price $/MWh'].mean() # same as for high
    return high - low


dd = pd.read_csv("/Users/vortex/Downloads/ERCOT LMP 2023.csv")
dd['Local Datetime (Hour Ending)'] = pd.to_datetime(dd['Local Datetime (Hour Ending)'], format = "%m/%d/%Y %H:%M")
node_ix = dd['Price Node ID'].unique()
#dd.drop('Price Node Name', axis = 1, inplace = True)

results = []
time_start = dt.datetime.now()
time_log = []
n = 2

for node in node_ix: # updated to node so that you access the values directly instead of trying to index into the value (faster generally and preferred)
    loop_start = dt.datetime.now()
    tmp = dd[dd['Price Node ID'] == node].copy()
    tmp.drop('Price Node ID', axis = 1, inplace = True)
    results.append((node, tb4(tmp, n))) # removed .mean() because already getting a value from func tb4 - just need to access it # removed += to append because += is a slower method than append (for lists)
    loop_end = dt.datetime.now()
    time_log.append(loop_end-loop_start) # removed += to append because += is a slower method than append (for lists)
    avg_time = sum(time_log,dt.timedelta()) / len(time_log)
    print(f"{dt.datetime.now().strftime(format = '%H:%M:%S')} -- {i+1}/{len(node_ix)} -- Estimated finish: {(avg_time * (len(node_ix) - i)  + time_start).strftime(format = '%H:%M:%S')}")

注意,我没有使用完整的示例数据对其进行测试(因为它尚未采用您正在使用的格式)

如果需要进一步改进,我建议对您的代码进行采样,看看哪些区域最需要关注来改进时序。

© www.soinside.com 2019 - 2024. All rights reserved.