groupy 上的 Dask Dataframe 模式?

问题描述 投票:0回答:1

我正在尝试在 dask 数据帧中的 groupby 聚合下提取一系列的“模式”。我可以找到模式文档,但找不到如何在分组下使用它。

import pandas as pd
import numpy as np
data = pd.DataFrame({
    'status' :  ['pending', 'pending','pending', 'canceled','canceled','canceled', 'confirmed', 'confirmed','confirmed'],
    'clientId' : ['A', 'B', 'C', 'A', 'D', 'C', 'A', 'B','C'],
    'partner' :  ['A', np.nan,'C', 'A',np.nan,'C', 'A', np.nan,'C'],
    'product' : ['afiliates', 'pre-paid', 'giftcard','afiliates', 'pre-paid', 'giftcard','afiliates', 'pre-paid', 'giftcard'],
    'brand' : ['brand_1', 'brand_2', 'brand_3','brand_1', 'brand_2', 'brand_3','brand_1', 'brand_3', 'brand_3'],
    'gmv' : [100,100,100,100,100,100,100,100,100]})

data = data.astype({'partner':'category','status':'category','product':'category', 'brand':'category'})

import dask.dataframe as dd
df = dd.from_pandas(data,npartitions=1)
df.groupby(['clientId', 'product'], observed=True).aggregate({'brand':'mode'})
df.compute()

谢谢!

python pandas group-by dask dask-dataframe
1个回答
0
投票

这个答案基于here提供的代码,并进行了一些修改:

from pandas import DataFrame, Series, NA
from dask.dataframe import from_pandas, Aggregation

data = DataFrame(
    {
        "status": [
            "pending",
            "pending",
            "pending",
            "canceled",
            "canceled",
            "canceled",
            "confirmed",
            "confirmed",
            "confirmed",
        ],
        "clientId": ["A", "B", "C", "A", "D", "C", "A", "B", "C"],
        "partner": ["A", NA, "C", "A", NA, "C", "A", NA, "C"],
        "product": [
            "afiliates",
            "pre-paid",
            "giftcard",
            "afiliates",
            "pre-paid",
            "giftcard",
            "afiliates",
            "pre-paid",
            "giftcard",
        ],
        "brand": [
            "brand_4",
            "brand_2",
            "brand_3",
            "brand_1",
            "brand_2",
            "brand_3",
            "brand_1",
            "brand_3",
            "brand_3",
        ],
        "gmv": [100, 100, 100, 100, 100, 100, 100, 100, 100],
    }
)

data = data.astype(
    {
        "partner": "category",
        "status": "category",
        "product": "category",
        "brand": "category",
    }
)
mode_pandas = data.groupby(["clientId", "product"], observed=True).agg(
    {"brand": Series.mode}
)

df = from_pandas(data, npartitions=1)


def chunk(s):
    return s.value_counts()


def agg(s0):
    _intermediate = s0._selected_obj.groupby(level=s0._selected_obj.index.names).sum()
    _intermediate = _intermediate[_intermediate > 0]
    return _intermediate


def finalize(s):
    level = list(range(s.index.nlevels - 1))
    return s.groupby(level=level, group_keys=False).apply(lambda s: s[s == s.max()])


mode = Aggregation(
    name="mode",
    chunk=chunk,
    agg=agg,
    finalize=finalize,
)


mode_dask = df.groupby(["clientId", "product"], observed=True, dropna=True).aggregate(
    {"brand": mode}
).compute()

print(mode_pandas)
print(mode_dask)

请注意,

dask
版本不会产生与
pandas
完全相同的输出,但这是留给读者的一个有趣的练习。

© www.soinside.com 2019 - 2024. All rights reserved.