我是 Dask 的新手。我创建了一个 dask 数据框,使用 drop 命令删除了一些列。此后我将执行其他操作。当我调用计算时,出现错误:“Future”对象没有属性“drop”。
在 drop 命令之后立即调用compute() 工作正常,但是当我在之后调用它几个语句时,它给了我这个错误。请提出这里有什么问题。
顺便说一句,我使用的是分布在我本地机器上的dask,版本是1.2.1。
import dask.dataframe as dd
from dask.distributed import Client
client = Client()
df = dd.read_csv("XYZ.csv", sep="\t",low_memory=False) #Its about 3 GB in size
df = df.persist() #Data is split ito 47 partitions
list_of_columns_to_delete = ['ABC', 'AXY', 'JDR']
df = df.drop(list_of_columns_to_delete, axis=1, errors=True)
df.EngineSpeed.mean().compute() #this works fine and computes the mean
df = df[(df.Time < "23:59:59") ]
df = df[df.EngineSpeed > 605]
df = df[df.ServiceBrakeCircuit1AirPressure.notnull()]
df = df[df.ServiceBrakeCircuit2AirPressure.notnull()]
df.GpsSpeed = df.GpsSpeed.where(df.GpsSpeed < 111,111)
df.GpsSpeed.mean().compute() #This gives 'Future' object has no attribute 'drop' error`
请建议错误的含义以及如何纠正它。
我尝试使用类似的数据集产生错误,一切正常
In [1]: import dask
In [2]: df = dask.datasets.timeseries()
In [3]: from dask.distributed import Client
In [4]: client = Client()
In [5]: df = df.persist()
In [6]: df
Out[6]:
Dask DataFrame Structure:
id name x y
npartitions=30
2000-01-01 int64 object float64 float64
2000-01-02 ... ... ... ...
... ... ... ... ...
2000-01-30 ... ... ... ...
2000-01-31 ... ... ... ...
Dask Name: make-timeseries, 30 tasks
In [7]: df = df.drop(['x', 'name'], axis=1, errors=True)
In [8]: df.y.mean().compute()
Out[8]: 0.00012229375505932677
我建议制作一个MCVE
对我来说,问题是客户端不匹配:由于某种原因(可能是我自己做的),默认客户端与我用来调用的客户端不匹配
.persist
,并且他们无法相互交谈(默认是本地客户端,远程是K8s)
在执行依赖于默认客户端的操作时出现问题,例如
dask.dataframe.from_delayed
,它将验证默认客户端内的元数据。
这里有一个神奇的咒语可以解决这个问题:
from distributed.client import ensure_default_client
import dask
import dask.dataframe as dd
ensure_default_client(my_client)
dask.config.set(scheduler=my_client) # Interestingly, this is also needed
# Now we can safely use operations that don't let you specify a scheduler.
ddf = dd.from_delayed(my_delayed_things, meta=my_meta_df)
ddf = ddf.persist(scheduler=my_client) # And now this works