当我尝试在共享索引上合并两个 Dask DataFrame 时,遇到了
ValueError
。如果我不初始化 Cluster
和 Client
,这将起作用。
在 Dask 2023.6.0 中重现的步骤:
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster
import pandas as pd
from dask import delayed
cluster = LocalCluster(n_workers=2)
client = Client(cluster)
def create_part(i, col):
return pd.DataFrame(
data={
col: list(range(10)),
},
index=[f"{i}_{j}" for j in range(10)]
)
df1 = dd.from_delayed([delayed(create_part)(i, "a") for i in range(3)])
df2 = dd.from_delayed([delayed(create_part)(i, "b") for i in range(3)])
df1.merge(df2, left_index=True, right_index=True).compute()
这会产生以下输出:
2023-06-22 13:45:02,845 - distributed.worker - WARNING - Compute Failed
Key: ('create_part-assign-927e22614a94e66b3d2b93debcd761ca', 1)
Function: execute_task
args: ((subgraph_callable-16a882c1-957b-443c-864a-9cf3a3577335, '__hash_partition', 'getitem-fff73ecfd4dfe9d1b867611d90a36992', [], (<function create_part at 0x7f41bc80b6d0>, 1, 'b')))
kwargs: {}
Exception: "ValueError('Length of values (0) does not match length of index (10)')"
2023-06-22 13:45:02,845 - distributed.worker - WARNING - Compute Failed
Key: ('create_part-assign-927e22614a94e66b3d2b93debcd761ca', 0)
Function: execute_task
args: ((subgraph_callable-16a882c1-957b-443c-864a-9cf3a3577335, '__hash_partition', 'getitem-fff73ecfd4dfe9d1b867611d90a36992', [], (<function create_part at 0x7f41bc80bd00>, 0, 'b')))
kwargs: {}
Exception: "ValueError('Length of values (0) does not match length of index (10)')"
2023-06-22 13:45:02,845 - distributed.worker - WARNING - Compute Failed
Key: ('create_part-assign-927e22614a94e66b3d2b93debcd761ca', 2)
Function: execute_task
args: ((subgraph_callable-16a882c1-957b-443c-864a-9cf3a3577335, '__hash_partition', 'getitem-fff73ecfd4dfe9d1b867611d90a36992', [], (<function create_part at 0x7f41bc840040>, 2, 'b')))
kwargs: {}
Exception: "ValueError('Length of values (0) does not match length of index (10)')"
2023-06-22 13:45:02,845 - distributed.worker - WARNING - Compute Failed
Key: ('create_part-assign-09b3043c2ae5ff26d93dd8951c816b5b', 2)
Function: execute_task
args: ((subgraph_callable-d50b8eb3-a0ef-454a-9a52-d58a57c3b693, '__hash_partition', 'getitem-9b88d3b8f6cca25fbd56e9023914c872', [], (<function create_part at 0x7f41bc80beb0>, 2, 'a')))
kwargs: {}
Exception: "ValueError('Length of values (0) does not match length of index (10)')"
2023-06-22 13:45:02,845 - distributed.worker - WARNING - Compute Failed
Key: ('create_part-assign-09b3043c2ae5ff26d93dd8951c816b5b', 0)
Function: execute_task
args: ((subgraph_callable-d50b8eb3-a0ef-454a-9a52-d58a57c3b693, '__hash_partition', 'getitem-9b88d3b8f6cca25fbd56e9023914c872', [], (<function create_part at 0x7f41bc80b9a0>, 0, 'a')))
kwargs: {}
Exception: "ValueError('Length of values (0) does not match length of index (10)')"
2023-06-22 13:45:02,845 - distributed.worker - WARNING - Compute Failed
Key: ('create_part-assign-09b3043c2ae5ff26d93dd8951c816b5b', 1)
Function: execute_task
args: ((subgraph_callable-d50b8eb3-a0ef-454a-9a52-d58a57c3b693, '__hash_partition', 'getitem-9b88d3b8f6cca25fbd56e9023914c872', [], (<function create_part at 0x7f41bc80b7f0>, 1, 'a')))
kwargs: {}
Exception: "ValueError('Length of values (0) does not match length of index (10)')"
我在 Dask 2023.5.1 上遇到同样的问题。通过研究您的最小示例,我能够通过重置索引然后再次显式设置它来解决它。大量的开销,但目前唯一让它工作的东西。
df1.reset_index().set_index("index").merge(df2.reset_index().set_index("index"), left_index=True, right_index=True).compute()
我创建了一个示例,该示例会导致升级后我们当前遇到的相同 ValueError 。额外的是,它正在执行 groupby 和求和,并且合并仅在外连接上失败。 以下代码对于我们当前的 Dask 版本(2022.5.2)来说就像一个魅力。所以不知道究竟发生了什么变化,但显然指数显示出一些意想不到的行为。
import dask.dataframe as dd
import random
import pandas as pd
from dask import delayed
def create_part(cols):
return pd.DataFrame(
data={
col: [random.randint(0,10) for _ in range(20)] for col in cols
},
)
df1 = dd.from_delayed(delayed(create_part)(["id","number_reads"])).persist()
df2 = dd.from_delayed(delayed(create_part)([ "id","number_views"])).persist()
df1 = df1.groupby("id").sum()
df2 = df2.groupby("id").sum()
df1.merge(df2, left_index=True, right_index=True,how="outer").compute()
错误:
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
Cell In[149], line 16
14 df1 = df1.groupby("id").sum()
15 df2 = df2.groupby("id").sum()
---> 16 df1.merge(df2, left_index=True, right_index=True,how="outer").compute()
File /opt/conda/lib/python3.10/site-packages/dask/base.py:310, in DaskMethodsMixin.compute(self, **kwargs)
286 def compute(self, **kwargs):
287 """Compute this dask collection
288
289 This turns a lazy Dask collection into its in-memory equivalent.
(...)
308 dask.compute
309 """
--> 310 (result,) = compute(self, traverse=False, **kwargs)
311 return result
File /opt/conda/lib/python3.10/site-packages/dask/base.py:595, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
592 keys.append(x.__dask_keys__())
593 postcomputes.append(x.__dask_postcompute__())
--> 595 results = schedule(dsk, keys, **kwargs)
596 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File /opt/conda/lib/python3.10/site-packages/distributed/client.py:3227, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
3225 should_rejoin = False
3226 try:
-> 3227 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
3228 finally:
3229 for f in futures.values():
File /opt/conda/lib/python3.10/site-packages/distributed/client.py:2361, in Client.gather(self, futures, errors, direct, asynchronous)
2359 else:
2360 local_worker = None
-> 2361 return self.sync(
2362 self._gather,
2363 futures,
2364 errors=errors,
2365 direct=direct,
2366 local_worker=local_worker,
2367 asynchronous=asynchronous,
2368 )
File /opt/conda/lib/python3.10/site-packages/distributed/utils.py:351, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
349 return future
350 else:
--> 351 return sync(
352 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
353 )
File /opt/conda/lib/python3.10/site-packages/distributed/utils.py:418, in sync(loop, func, callback_timeout, *args, **kwargs)
416 if error:
417 typ, exc, tb = error
--> 418 raise exc.with_traceback(tb)
419 else:
420 return result
File /opt/conda/lib/python3.10/site-packages/distributed/utils.py:391, in sync.<locals>.f()
389 future = wait_for(future, callback_timeout)
390 future = asyncio.ensure_future(future)
--> 391 result = yield future
392 except Exception:
393 error = sys.exc_info()
File /opt/conda/lib/python3.10/site-packages/tornado/gen.py:767, in Runner.run(self)
765 try:
766 try:
--> 767 value = future.result()
768 except Exception as e:
769 # Save the exception for later. It's important that
770 # gen.throw() not be called inside this try/except block
771 # because that makes sys.exc_info behave unexpectedly.
772 exc: Optional[Exception] = e
File /opt/conda/lib/python3.10/site-packages/distributed/client.py:2224, in Client._gather(self, futures, errors, direct, local_worker)
2222 exc = CancelledError(key)
2223 else:
-> 2224 raise exception.with_traceback(traceback)
2225 raise exc
2226 if errors == "skip":
File /opt/conda/lib/python3.10/site-packages/dask/optimization.py:992, in __call__()
990 if not len(args) == len(self.inkeys):
991 raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 992 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
File /opt/conda/lib/python3.10/site-packages/dask/core.py:151, in get()
149 for key in toposort(dsk):
150 task = dsk[key]
--> 151 result = _execute_task(task, cache)
152 cache[key] = result
153 result = _execute_task(out, cache)
File /opt/conda/lib/python3.10/site-packages/dask/core.py:121, in _execute_task()
117 func, args = arg[0], arg[1:]
118 # Note: Don't assign the subtask results to a variable. numpy detects
119 # temporaries by their reference count and can execute certain
120 # operations in-place.
--> 121 return func(*(_execute_task(a, cache) for a in args))
122 elif not ishashable(arg):
123 return arg
File /opt/conda/lib/python3.10/site-packages/dask/core.py:121, in <genexpr>()
117 func, args = arg[0], arg[1:]
118 # Note: Don't assign the subtask results to a variable. numpy detects
119 # temporaries by their reference count and can execute certain
120 # operations in-place.
--> 121 return func(*(_execute_task(a, cache) for a in args))
122 elif not ishashable(arg):
123 return arg
File /opt/conda/lib/python3.10/site-packages/dask/core.py:121, in _execute_task()
117 func, args = arg[0], arg[1:]
118 # Note: Don't assign the subtask results to a variable. numpy detects
119 # temporaries by their reference count and can execute certain
120 # operations in-place.
--> 121 return func(*(_execute_task(a, cache) for a in args))
122 elif not ishashable(arg):
123 return arg
File /opt/conda/lib/python3.10/site-packages/dask/utils.py:73, in apply()
42 """Apply a function given its positional and keyword arguments.
43
44 Equivalent to ``func(*args, **kwargs)``
(...)
70 >>> dsk = {'task-name': task} # adds the task to a low level Dask task graph
71 """
72 if kwargs:
---> 73 return func(*args, **kwargs)
74 else:
75 return func(*args)
File /opt/conda/lib/python3.10/site-packages/dask/dataframe/core.py:7089, in apply_and_enforce()
7087 func = kwargs.pop("_func")
7088 meta = kwargs.pop("_meta")
-> 7089 df = func(*args, **kwargs)
7090 if is_dataframe_like(df) or is_series_like(df) or is_index_like(df):
7091 if not len(df):
File /opt/conda/lib/python3.10/site-packages/dask/dataframe/shuffle.py:778, in partitioning_index()
761 def partitioning_index(df, npartitions):
762 """
763 Computes a deterministic index mapping each record to a partition.
764
(...)
776 An array of int64 values mapping each record to a partition.
777 """
--> 778 return hash_object_dispatch(df, index=False) % int(npartitions)
File /opt/conda/lib/python3.10/site-packages/dask/utils.py:642, in __call__()
638 """
639 Call the corresponding method based on type of argument.
640 """
641 meth = self.dispatch(type(arg))
--> 642 return meth(arg, *args, **kwargs)
File /opt/conda/lib/python3.10/site-packages/dask/dataframe/backends.py:486, in hash_object_pandas()
482 @hash_object_dispatch.register((pd.DataFrame, pd.Series, pd.Index))
483 def hash_object_pandas(
484 obj, index=True, encoding="utf8", hash_key=None, categorize=True
485 ):
--> 486 return pd.util.hash_pandas_object(
487 obj, index=index, encoding=encoding, hash_key=hash_key, categorize=categorize
488 )
File /opt/conda/lib/python3.10/site-packages/pandas/core/util/hashing.py:171, in hash_pandas_object()
168 hashes = (x for x in _hashes)
169 h = combine_hash_arrays(hashes, num_items)
--> 171 ser = Series(h, index=obj.index, dtype="uint64", copy=False)
172 else:
173 raise TypeError(f"Unexpected type for hashing {type(obj)}")
File /opt/conda/lib/python3.10/site-packages/pandas/core/series.py:500, in __init__()
498 index = default_index(len(data))
499 elif is_list_like(data):
--> 500 com.require_length_match(data, index)
502 # create/copy the manager
503 if isinstance(data, (SingleBlockManager, SingleArrayManager)):
File /opt/conda/lib/python3.10/site-packages/pandas/core/common.py:576, in require_length_match()
572 """
573 Check the length of data matches the length of the index.
574 """
575 if len(data) != len(index):
--> 576 raise ValueError(
577 "Length of values "
578 f"({len(data)}) "
579 "does not match length of index "
580 f"({len(index)})"
581 )
ValueError: Length of values (0) does not match length of index (8)