Dask数据帧基于列或函数拆分分区

问题描述 投票:3回答:2

我最近开始查看Dask的大数据。我有一个关于并行有效应用操作的问题。

假设我有一些这样的销售数据:

customerKey    productKey    transactionKey    grossSales  netSales      unitVolume    volume transactionDate
-----------  --------------  ----------------  ----------  --------      ----------    ------ --------------------
    20353           189            219548     0.921058     0.921058              1         1  2017-02-01 00:00:00
  2596618           189            215015     0.709997     0.709997              1         1  2017-02-01 00:00:00
 30339435           189            215184     0.918068     0.918068              1         1  2017-02-01 00:00:00
 32714675           189            216656     0.751007     0.751007              1         1  2017-02-01 00:00:00
 39232537           189            218180     0.752392     0.752392              1         1  2017-02-01 00:00:00
 41722826           189            216806     0.0160143    0.0160143             1         1  2017-02-01 00:00:00
 46525123           189            219875     0.469437     0.469437              1         1  2017-02-01 00:00:00
 51024667           189            215457     0.244886     0.244886              1         1  2017-02-01 00:00:00
 52949803           189            215413     0.837739     0.837739              1         1  2017-02-01 00:00:00
 56526281           189            220261     0.464716     0.464716              1         1  2017-02-01 00:00:00
 56776211           189            220017     0.272027     0.272027              1         1  2017-02-01 00:00:00
 58198475           189            215058     0.805758     0.805758              1         1  2017-02-01 00:00:00
 63523098           189            214821     0.479798     0.479798              1         1  2017-02-01 00:00:00
 65987889           189            217484     0.122769     0.122769              1         1  2017-02-01 00:00:00
 74607556           189            220286     0.564133     0.564133              1         1  2017-02-01 00:00:00
 75533379           189            217880     0.164387     0.164387              1         1  2017-02-01 00:00:00
 85676779           189            215150     0.0180961    0.0180961             1         1  2017-02-01 00:00:00
 88072944           189            219071     0.492753     0.492753              1         1  2017-02-01 00:00:00
 90233554           189            216118     0.439582     0.439582              1         1  2017-02-01 00:00:00
 91949008           189            220178     0.1893       0.1893                1         1  2017-02-01 00:00:00
 91995925           189            215159     0.566552     0.566552              1         1  2017-02-01 00:00:00

我想做几个不同的groupbys,首先是groupby-apply on customerKey。然后是customerKey上的另一个groupby-sum,以及将作为previos groupby的结果的列。

我能想到的最有效的方法是将这个数据帧拆分成客户键块的分区。因此,例如,我可以使用分区方案将数据帧拆分为4个块,例如像(伪代码)

按customerKey%4分区

然后我可以使用map_partitions来执行这些组,通过应用每个分区,然后最终返回结果。然而,似乎dask迫使我为每个组织做一次洗牌。

是否无法根据列的值进行重新分区?

目前,在仅约80,000行的数据帧上,需要约45秒,4名工人。我计划将其扩展到数万亿行的数据框,并且看起来这似乎是可怕的扩展。

我错过了Dask的基本功能吗?

python pandas dataframe dask dask-distributed
2个回答
5
投票

您可以将列设置为索引

df = df.set_index('customerKey')

这将按该列对数据进行排序,并跟踪哪个值在哪个分区中。正如您所说,这可能是一项昂贵的操作,您可能希望将其保存在某个地方

无论是在记忆中

df = df.persist()

或在磁盘上

df.to_parquet('...')
df = df.read_parquet('...')

0
投票

将index设置为所需列,map_partitions与groupby相比效率更高

© www.soinside.com 2019 - 2024. All rights reserved.