Dask Dataframe 错误:“Future”对象没有属性“drop”

问题描述 投票:0回答:2

我是 Dask 的新手。我创建了一个 dask 数据框,使用 drop 命令删除了一些列。此后我将执行其他操作。当我调用计算时,出现错误:“Future”对象没有属性“drop”。

在 drop 命令之后立即调用compute() 工作正常,但是当我在之后调用它几个语句时,它给了我这个错误。请提出这里有什么问题。

顺便说一句,我使用的是分布在我本地机器上的dask,版本是1.2.1。

import dask.dataframe as dd  
from dask.distributed import Client  
client = Client()   
df = dd.read_csv("XYZ.csv", sep="\t",low_memory=False) #Its about 3 GB in size   
df = df.persist() #Data is split ito 47 partitions   

list_of_columns_to_delete = ['ABC', 'AXY', 'JDR']    

df = df.drop(list_of_columns_to_delete, axis=1, errors=True)   

df.EngineSpeed.mean().compute() #this works fine and computes the mean   
df = df[(df.Time < "23:59:59") ]   
df = df[df.EngineSpeed > 605]   
df = df[df.ServiceBrakeCircuit1AirPressure.notnull()]   
df = df[df.ServiceBrakeCircuit2AirPressure.notnull()]   
df.GpsSpeed = df.GpsSpeed.where(df.GpsSpeed < 111,111)    
df.GpsSpeed.mean().compute() #This gives 'Future' object has no attribute 'drop' error`     

请建议错误的含义以及如何纠正它。

python dask
2个回答
1
投票

我尝试使用类似的数据集产生错误,一切正常

In [1]: import dask

In [2]: df = dask.datasets.timeseries()

In [3]: from dask.distributed import Client

In [4]: client = Client()

In [5]: df = df.persist()

In [6]: df
Out[6]:
Dask DataFrame Structure:
                   id    name        x        y
npartitions=30
2000-01-01      int64  object  float64  float64
2000-01-02        ...     ...      ...      ...
...               ...     ...      ...      ...
2000-01-30        ...     ...      ...      ...
2000-01-31        ...     ...      ...      ...
Dask Name: make-timeseries, 30 tasks

In [7]: df = df.drop(['x', 'name'], axis=1, errors=True)

In [8]: df.y.mean().compute()
Out[8]: 0.00012229375505932677

我建议制作一个MCVE


0
投票

对我来说,问题是客户端不匹配:由于某种原因(可能是我自己做的),默认客户端与我用来调用的客户端不匹配

.persist
,并且他们无法相互交谈(默认是本地客户端,远程是K8s)

在执行依赖于默认客户端的操作时出现问题,例如

dask.dataframe.from_delayed
,它将验证默认客户端内的元数据。

这里有一个神奇的咒语可以解决这个问题:

from distributed.client import ensure_default_client
import dask 
import dask.dataframe as dd

ensure_default_client(my_client)
dask.config.set(scheduler=my_client) # Interestingly, this is also needed

# Now we can safely use operations that don't let you specify a scheduler.
ddf = dd.from_delayed(my_delayed_things, meta=my_meta_df) 
ddf = ddf.persist(scheduler=my_client) # And now this works
© www.soinside.com 2019 - 2024. All rights reserved.