我写了一个 lambda 函数用于 Pandas 的聚合函数。我怎样才能在 Dask 的聚合函数中实现这个 lambda 函数?

问题描述 投票:0回答:1

我写了一个自定义的 lambda 函数,需要在数据帧中的 groupby 操作之后应用。 lambda 函数将特定组中的所有**唯一**字符串与适当的连接符(例如“,”)连接在一起。我正在尝试在 Python 的 Dask 库中实现相同的功能。但是,我收到如下所示的错误消息。谁能指导我如何在 Dask 中实现这个 lambda 函数?

在 Pandas 中实现:

A = pd.DataFrame(data = {"A": ["saad", "saad", "saad", "saad", "nimra", "asad", "nimra", "nimra", "asad"],
                         "B": ["hello", "hello", "saad", "whatsup?", "yup", "nup", "saad", "saad", "nup"],
                         "C": ["hello", "hello", "saad", "whatsup?", "yup", "nup", "saad", "saad", "nup"]
                        }
                )
A.groupby("A")["B"].unique().apply(', '. join)
A.groupby("A").agg(lambda s: ', '.join(s.unique()))

This code works perfectly fine and produces the correct output:
        B                       C
A       
asad    nup                     nup
nimra   yup, saad               yup, saad
saad    hello, saad, whatsup?   hello, saad, whatsup?



在 Dask 中实现

我尝试使用以下代码在 Dask 中实现它:

A_1 = A.copy()
A_1 = dd.from_pandas(A, npartitions=2)
A_1.groupby("A").agg(lambda s: ', '.join(s.unique()))


However, the following error occurs:
ValueError                                Traceback (most recent call last)
Cell In[20], line 1
----> 1 A_1.groupby("A").agg(lambda s: ', '.join(s.unique()))

File /opt/miniconda/lib/python3.8/site-packages/dask/dataframe/groupby.py:369, in numeric_only_not_implemented.<locals>.wrapper(self, *args, **kwargs)
    359             if (
    360                 PANDAS_GT_150
    361                 and not PANDAS_GT_200
    362                 and numeric_only is no_default
    363             ):
    364                 warnings.warn(
    365                     "The default value of numeric_only will be changed to False "
    366                     "in the future when using dask with pandas 2.0",
    367                     FutureWarning,
    368                 )
--> 369 return func(self, *args, **kwargs)

File /opt/miniconda/lib/python3.8/site-packages/dask/dataframe/groupby.py:2832, in DataFrameGroupBy.agg(self, arg, split_every, split_out, shuffle, **kwargs)
   2829 @_aggregate_docstring(based_on="pd.core.groupby.DataFrameGroupBy.agg")
   2830 @numeric_only_not_implemented
   2831 def agg(self, arg=None, split_every=None, split_out=1, shuffle=None, **kwargs):
-> 2832     return self.aggregate(
   2833         arg=arg,
   2834         split_every=split_every,
   2835         split_out=split_out,
   2836         shuffle=shuffle,
   2837         **kwargs,
   2838     )

File /opt/miniconda/lib/python3.8/site-packages/dask/dataframe/groupby.py:2821, in DataFrameGroupBy.aggregate(self, arg, split_every, split_out, shuffle, **kwargs)
   2818 if arg == "size":
   2819     return self.size()
-> 2821 return super().aggregate(
   2822     arg=arg,
   2823     split_every=split_every,
   2824     split_out=split_out,
   2825     shuffle=shuffle,
   2826     **kwargs,
   2827 )

File /opt/miniconda/lib/python3.8/site-packages/dask/dataframe/groupby.py:2248, in _GroupBy.aggregate(self, arg, split_every, split_out, shuffle, **kwargs)
   2245 else:
   2246     raise ValueError(f"aggregate on unknown object {self.obj}")
-> 2248 chunk_funcs, aggregate_funcs, finalizers = _build_agg_args(spec)
   2250 if isinstance(self.by, (tuple, list)) and len(self.by) > 1:
   2251     levels = list(range(len(self.by)))

File /opt/miniconda/lib/python3.8/site-packages/dask/dataframe/groupby.py:951, in _build_agg_args(spec)
    948 if not isinstance(func, Aggregation):
    949     func = funcname(known_np_funcs.get(func, func))
--> 951 impls = _build_agg_args_single(
    952     result_column, func, func_args, func_kwargs, input_column
    953 )
    955 # overwrite existing result-columns, generate intermediates only once
    956 for spec in impls["chunk_funcs"]:

File /opt/miniconda/lib/python3.8/site-packages/dask/dataframe/groupby.py:1010, in _build_agg_args_single(result_column, func, func_args, func_kwargs, input_column)
   1007     return _build_agg_args_custom(result_column, func, input_column)
   1009 else:
-> 1010     raise ValueError(f"unknown aggregate {func}")

ValueError: unknown aggregate lambda
A.groupby("A").apply(','.join)
python pandas aggregate dask dask-distributed
1个回答
0
投票

您可以改用 dask.dataframe.apply 函数。

from dask import dataframe as dd

def agg_fn(x):
    return pd.Series(
        dict(
            B = "%s" % ', '.join(x['B'].unique()), # string (concat strings)
            C = "%s" % ', '.join(x['C'].unique())
        )
    )

A_1.groupby('A').apply(agg_fn, meta=pd.DataFrame(columns=['B', 'C'], dtype=str)).compute()
© www.soinside.com 2019 - 2024. All rights reserved.