在共享索引上合并两个 DataFrame 时出现 Dask ValueError

问题描述 投票:0回答:1

当我尝试在共享索引上合并两个 Dask DataFrame 时,遇到了

ValueError
。如果我不初始化
Cluster
Client
,这将起作用。

在 Dask 2023.6.0 中重现的步骤:

import dask.dataframe as dd
from dask.distributed import Client, LocalCluster
import pandas as pd
from dask import delayed

cluster = LocalCluster(n_workers=2)
client = Client(cluster)

def create_part(i, col):
    return pd.DataFrame(
        data={
            col: list(range(10)),
        },
        index=[f"{i}_{j}" for j in range(10)]
    )

df1 = dd.from_delayed([delayed(create_part)(i, "a") for i in range(3)])
df2 = dd.from_delayed([delayed(create_part)(i, "b") for i in range(3)])

df1.merge(df2, left_index=True, right_index=True).compute()

这会产生以下输出:

2023-06-22 13:45:02,845 - distributed.worker - WARNING - Compute Failed
Key:       ('create_part-assign-927e22614a94e66b3d2b93debcd761ca', 1)
Function:  execute_task
args:      ((subgraph_callable-16a882c1-957b-443c-864a-9cf3a3577335, '__hash_partition', 'getitem-fff73ecfd4dfe9d1b867611d90a36992', [], (<function create_part at 0x7f41bc80b6d0>, 1, 'b')))
kwargs:    {}
Exception: "ValueError('Length of values (0) does not match length of index (10)')"

2023-06-22 13:45:02,845 - distributed.worker - WARNING - Compute Failed
Key:       ('create_part-assign-927e22614a94e66b3d2b93debcd761ca', 0)
Function:  execute_task
args:      ((subgraph_callable-16a882c1-957b-443c-864a-9cf3a3577335, '__hash_partition', 'getitem-fff73ecfd4dfe9d1b867611d90a36992', [], (<function create_part at 0x7f41bc80bd00>, 0, 'b')))
kwargs:    {}
Exception: "ValueError('Length of values (0) does not match length of index (10)')"

2023-06-22 13:45:02,845 - distributed.worker - WARNING - Compute Failed
Key:       ('create_part-assign-927e22614a94e66b3d2b93debcd761ca', 2)
Function:  execute_task
args:      ((subgraph_callable-16a882c1-957b-443c-864a-9cf3a3577335, '__hash_partition', 'getitem-fff73ecfd4dfe9d1b867611d90a36992', [], (<function create_part at 0x7f41bc840040>, 2, 'b')))
kwargs:    {}
Exception: "ValueError('Length of values (0) does not match length of index (10)')"

2023-06-22 13:45:02,845 - distributed.worker - WARNING - Compute Failed
Key:       ('create_part-assign-09b3043c2ae5ff26d93dd8951c816b5b', 2)
Function:  execute_task
args:      ((subgraph_callable-d50b8eb3-a0ef-454a-9a52-d58a57c3b693, '__hash_partition', 'getitem-9b88d3b8f6cca25fbd56e9023914c872', [], (<function create_part at 0x7f41bc80beb0>, 2, 'a')))
kwargs:    {}
Exception: "ValueError('Length of values (0) does not match length of index (10)')"

2023-06-22 13:45:02,845 - distributed.worker - WARNING - Compute Failed
Key:       ('create_part-assign-09b3043c2ae5ff26d93dd8951c816b5b', 0)
Function:  execute_task
args:      ((subgraph_callable-d50b8eb3-a0ef-454a-9a52-d58a57c3b693, '__hash_partition', 'getitem-9b88d3b8f6cca25fbd56e9023914c872', [], (<function create_part at 0x7f41bc80b9a0>, 0, 'a')))
kwargs:    {}
Exception: "ValueError('Length of values (0) does not match length of index (10)')"

2023-06-22 13:45:02,845 - distributed.worker - WARNING - Compute Failed
Key:       ('create_part-assign-09b3043c2ae5ff26d93dd8951c816b5b', 1)
Function:  execute_task
args:      ((subgraph_callable-d50b8eb3-a0ef-454a-9a52-d58a57c3b693, '__hash_partition', 'getitem-9b88d3b8f6cca25fbd56e9023914c872', [], (<function create_part at 0x7f41bc80b7f0>, 1, 'a')))
kwargs:    {}
Exception: "ValueError('Length of values (0) does not match length of index (10)')"
python dask
1个回答
0
投票

我在 Dask 2023.5.1 上遇到同样的问题。通过研究您的最小示例,我能够通过重置索引然后再次显式设置它来解决它。大量的开销,但目前唯一让它工作的东西。

df1.reset_index().set_index("index").merge(df2.reset_index().set_index("index"), left_index=True, right_index=True).compute()

我创建了一个示例,该示例会导致升级后我们当前遇到的相同 ValueError 。额外的是,它正在执行 groupby 和求和,并且合并仅在外连接上失败。 以下代码对于我们当前的 Dask 版本(2022.5.2)来说就像一个魅力。所以不知道究竟发生了什么变化,但显然指数显示出一些意想不到的行为。

import dask.dataframe as dd
import random
import pandas as pd
from dask import delayed

def create_part(cols):
    return pd.DataFrame(
        data={
            col: [random.randint(0,10) for _ in range(20)] for col in cols
        },
    )

df1 = dd.from_delayed(delayed(create_part)(["id","number_reads"])).persist()
df2 = dd.from_delayed(delayed(create_part)([ "id","number_views"])).persist()
df1 = df1.groupby("id").sum()
df2 = df2.groupby("id").sum()
df1.merge(df2, left_index=True, right_index=True,how="outer").compute()

错误:

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Cell In[149], line 16
     14 df1 = df1.groupby("id").sum()
     15 df2 = df2.groupby("id").sum()
---> 16 df1.merge(df2, left_index=True, right_index=True,how="outer").compute()

File /opt/conda/lib/python3.10/site-packages/dask/base.py:310, in DaskMethodsMixin.compute(self, **kwargs)
    286 def compute(self, **kwargs):
    287     """Compute this dask collection
    288 
    289     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    308     dask.compute
    309     """
--> 310     (result,) = compute(self, traverse=False, **kwargs)
    311     return result

File /opt/conda/lib/python3.10/site-packages/dask/base.py:595, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    592     keys.append(x.__dask_keys__())
    593     postcomputes.append(x.__dask_postcompute__())
--> 595 results = schedule(dsk, keys, **kwargs)
    596 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File /opt/conda/lib/python3.10/site-packages/distributed/client.py:3227, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   3225         should_rejoin = False
   3226 try:
-> 3227     results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   3228 finally:
   3229     for f in futures.values():

File /opt/conda/lib/python3.10/site-packages/distributed/client.py:2361, in Client.gather(self, futures, errors, direct, asynchronous)
   2359 else:
   2360     local_worker = None
-> 2361 return self.sync(
   2362     self._gather,
   2363     futures,
   2364     errors=errors,
   2365     direct=direct,
   2366     local_worker=local_worker,
   2367     asynchronous=asynchronous,
   2368 )

File /opt/conda/lib/python3.10/site-packages/distributed/utils.py:351, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    349     return future
    350 else:
--> 351     return sync(
    352         self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    353     )

File /opt/conda/lib/python3.10/site-packages/distributed/utils.py:418, in sync(loop, func, callback_timeout, *args, **kwargs)
    416 if error:
    417     typ, exc, tb = error
--> 418     raise exc.with_traceback(tb)
    419 else:
    420     return result

File /opt/conda/lib/python3.10/site-packages/distributed/utils.py:391, in sync.<locals>.f()
    389         future = wait_for(future, callback_timeout)
    390     future = asyncio.ensure_future(future)
--> 391     result = yield future
    392 except Exception:
    393     error = sys.exc_info()

File /opt/conda/lib/python3.10/site-packages/tornado/gen.py:767, in Runner.run(self)
    765 try:
    766     try:
--> 767         value = future.result()
    768     except Exception as e:
    769         # Save the exception for later. It's important that
    770         # gen.throw() not be called inside this try/except block
    771         # because that makes sys.exc_info behave unexpectedly.
    772         exc: Optional[Exception] = e

File /opt/conda/lib/python3.10/site-packages/distributed/client.py:2224, in Client._gather(self, futures, errors, direct, local_worker)
   2222         exc = CancelledError(key)
   2223     else:
-> 2224         raise exception.with_traceback(traceback)
   2225     raise exc
   2226 if errors == "skip":

File /opt/conda/lib/python3.10/site-packages/dask/optimization.py:992, in __call__()
    990 if not len(args) == len(self.inkeys):
    991     raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 992 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))

File /opt/conda/lib/python3.10/site-packages/dask/core.py:151, in get()
    149 for key in toposort(dsk):
    150     task = dsk[key]
--> 151     result = _execute_task(task, cache)
    152     cache[key] = result
    153 result = _execute_task(out, cache)

File /opt/conda/lib/python3.10/site-packages/dask/core.py:121, in _execute_task()
    117     func, args = arg[0], arg[1:]
    118     # Note: Don't assign the subtask results to a variable. numpy detects
    119     # temporaries by their reference count and can execute certain
    120     # operations in-place.
--> 121     return func(*(_execute_task(a, cache) for a in args))
    122 elif not ishashable(arg):
    123     return arg

File /opt/conda/lib/python3.10/site-packages/dask/core.py:121, in <genexpr>()
    117     func, args = arg[0], arg[1:]
    118     # Note: Don't assign the subtask results to a variable. numpy detects
    119     # temporaries by their reference count and can execute certain
    120     # operations in-place.
--> 121     return func(*(_execute_task(a, cache) for a in args))
    122 elif not ishashable(arg):
    123     return arg

File /opt/conda/lib/python3.10/site-packages/dask/core.py:121, in _execute_task()
    117     func, args = arg[0], arg[1:]
    118     # Note: Don't assign the subtask results to a variable. numpy detects
    119     # temporaries by their reference count and can execute certain
    120     # operations in-place.
--> 121     return func(*(_execute_task(a, cache) for a in args))
    122 elif not ishashable(arg):
    123     return arg

File /opt/conda/lib/python3.10/site-packages/dask/utils.py:73, in apply()
     42 """Apply a function given its positional and keyword arguments.
     43 
     44 Equivalent to ``func(*args, **kwargs)``
   (...)
     70 >>> dsk = {'task-name': task}  # adds the task to a low level Dask task graph
     71 """
     72 if kwargs:
---> 73     return func(*args, **kwargs)
     74 else:
     75     return func(*args)

File /opt/conda/lib/python3.10/site-packages/dask/dataframe/core.py:7089, in apply_and_enforce()
   7087 func = kwargs.pop("_func")
   7088 meta = kwargs.pop("_meta")
-> 7089 df = func(*args, **kwargs)
   7090 if is_dataframe_like(df) or is_series_like(df) or is_index_like(df):
   7091     if not len(df):

File /opt/conda/lib/python3.10/site-packages/dask/dataframe/shuffle.py:778, in partitioning_index()
    761 def partitioning_index(df, npartitions):
    762     """
    763     Computes a deterministic index mapping each record to a partition.
    764 
   (...)
    776         An array of int64 values mapping each record to a partition.
    777     """
--> 778     return hash_object_dispatch(df, index=False) % int(npartitions)

File /opt/conda/lib/python3.10/site-packages/dask/utils.py:642, in __call__()
    638 """
    639 Call the corresponding method based on type of argument.
    640 """
    641 meth = self.dispatch(type(arg))
--> 642 return meth(arg, *args, **kwargs)

File /opt/conda/lib/python3.10/site-packages/dask/dataframe/backends.py:486, in hash_object_pandas()
    482 @hash_object_dispatch.register((pd.DataFrame, pd.Series, pd.Index))
    483 def hash_object_pandas(
    484     obj, index=True, encoding="utf8", hash_key=None, categorize=True
    485 ):
--> 486     return pd.util.hash_pandas_object(
    487         obj, index=index, encoding=encoding, hash_key=hash_key, categorize=categorize
    488     )

File /opt/conda/lib/python3.10/site-packages/pandas/core/util/hashing.py:171, in hash_pandas_object()
    168         hashes = (x for x in _hashes)
    169     h = combine_hash_arrays(hashes, num_items)
--> 171     ser = Series(h, index=obj.index, dtype="uint64", copy=False)
    172 else:
    173     raise TypeError(f"Unexpected type for hashing {type(obj)}")

File /opt/conda/lib/python3.10/site-packages/pandas/core/series.py:500, in __init__()
    498     index = default_index(len(data))
    499 elif is_list_like(data):
--> 500     com.require_length_match(data, index)
    502 # create/copy the manager
    503 if isinstance(data, (SingleBlockManager, SingleArrayManager)):

File /opt/conda/lib/python3.10/site-packages/pandas/core/common.py:576, in require_length_match()
    572 """
    573 Check the length of data matches the length of the index.
    574 """
    575 if len(data) != len(index):
--> 576     raise ValueError(
    577         "Length of values "
    578         f"({len(data)}) "
    579         "does not match length of index "
    580         f"({len(index)})"
    581     )

ValueError: Length of values (0) does not match length of index (8)
© www.soinside.com 2019 - 2024. All rights reserved.