在多分区Dask数据帧中的组内查找最大值最小值

问题描述 投票:0回答:1

在处理非常大的数据帧时,我很难利用Dask分区。想象一下一个200GB的csv,其中包含出租车行程的日志。我像这样加载数据:

df = dd.read_csv("/data/taxi_data_big.tsv", sep="\t")

然后,我想为每个要找出最早去机场的司机(DestinationId == 7)。

df1 = df[(df.DestinationId == 7)][["Date", "DriverId", "TripiId", "TripId_Rank", "DestinationId"]]

df1数据帧看起来像:

2020-01-01    D1    T1    8    7
2020-01-01    D1    T2    11   7
2020-01-01    D1    T3    44   7
2020-01-02    D1    T4    8    7
2020-01-02    D1    T5    13   7
2020-01-01    D2    T77   20   7
2020-01-01    D2    T177  76   7

2020年1月1日,驾驶员D2的第20和第76趟飞机前往机场。

为了进行分析,我需要找到驾驶员在前往机场之前的平均旅行次数。

df2 = df1.groupby('TripId').TripId_Rank.idxmin()将给我TripId和第一次去机场的索引。

df4 = df2.loc[df3]选择匹配的行。这适用于小型数据集,但是当我移至大型数据集时,我得到了"ValueError: Not all divisions are known, can't align partitions" when performing math on dataframe column.

如果我的理解是正确的,则错误是由于将数据框加载到多个分区而引起的,并且Dask文档要求在数据框上设置显式索引。

df1 = df[(df.DestinationId == 7)][["Date", "DriverId", "TripiId", "TripId_Rank", "DestinationId"]].compute()

df1['id'] = np.arange(len(df2)) # explicitly add index column to the dataframe
df1 = df1.set_index("id") # is this really necessary? This takes hours to complete

df2 = df1.groupby('TripId').TripId_Rank.idxmin()
df4 = df2.loc[df3]
df

上面的代码有效,但是我想知道是否有更好的解决方案。将id列添加到数据帧中真的很慢,我不确定上面的代码是否利用了Dask并行化。

提前感谢。

dask dask-distributed dask-dataframe
1个回答
0
投票

一种可行的方法是对分组数据使用Apply。

df1 = df[(df.DestinationId == 7)][["Date", "DriverId", "TripiId", "TripId_Rank", "DestinationId"]]

df2['idx'] = df2.index

def foo(grouped_df):
    row_with_min_cnt_index = grouped_df['Impression_Rank'].idxmin()
    row_with_min_cnt = grouped_df.loc[row_with_min_cnt_index]
    return row_with_min_cnt['idx']

keep_ids = df2.groupby('DriverId').apply(foo, meta=('x', 'f8')).compute()
df2[df2['idx'].isin(keep_ids)].compute()

但是请注意,将'idx'列添加到现有数据框中将花费很长时间。

© www.soinside.com 2019 - 2024. All rights reserved.