Dask群集仅在远程dask-workers加入时才会使read_csv失败

问题描述 投票:0回答:1

我是数据科学/ python的新手,学习速度快,是行业工程师,内心极客,我知道这是一个错误,但是可能有解决的办法,我将接受任何野心勃勃的想法。

我已经在本地计算机上实例化了Dask LocalCluster和Client:具有32GB RAM的Ryzen 3800X。

from dask.distributed import Client, LocalCluster
daskcluster = LocalCluster(host='0.0.0.0')
daskclient = Client(daskcluster)
daskclient

计划程序:tcp://192.168.1.152:62020

仪表板:http://192.168.1.152:8787/status

然后我试图读取数据集。Panda和Dask可以很高兴地读取446x * .csv文件的25GB目录。 (处理时间)

%%time 

df = dd.read_csv(origPathFile) # Yeah, a while.
df = df.set_index("Date (UTC)") # expect almost 5 minutes
df = df.drop_duplicates() ## HOURS
df = df.repartition(npartitions=600) ## new - yet to time
df.to_parquet(outpathfile) #this is the line which commits and computes the above.

[当我让这4个工人继续前进时,8个核心,16个线程,太好了,没问题。但是我这样做是为了学习对吗?我有一个配备8GB的Mac Mini和另一个配备32GB RAM的Ryzen3600。 RAM似乎是我的瓶颈。

[当我在另一个Ryzen或Mac Mini的Terminal上启动Anaconda Prompts时,执行一个工作人员加入竞争,]

dask-worker -memory-limit 10GB 192.168.1.152:62020

我收到错误消息:

FileNotFoundError                         Traceback (most recent call
last) <timed exec> in <module>

C:\ProgramData\Anaconda3\lib\site-packages\dask\dataframe\core.py in
set_index(***failed resolving arguments***)    3615                
npartitions=npartitions,    3616                 divisions=divisions,
-> 3617                 **kwargs    3618             )    3619 

C:\ProgramData\Anaconda3\lib\site-packages\dask\dataframe\shuffle.py
in set_index(df, index, npartitions, shuffle, compute, drop, upsample,
divisions, partition_size, **kwargs)
     83         sizes, mins, maxes = base.optimize(sizes, mins, maxes)
     84         divisions, sizes, mins, maxes = base.compute(
---> 85             divisions, sizes, mins, maxes, optimize_graph=False
     86         )
     87         divisions = divisions.tolist()

C:\ProgramData\Anaconda3\lib\site-packages\dask\base.py in
compute(*args, **kwargs)
    442         postcomputes.append(x.__dask_postcompute__())
    443 
--> 444     results = schedule(dsk, keys, **kwargs)
    445     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    446 

C:\ProgramData\Anaconda3\lib\site-packages\distributed\client.py in
get(self, dsk, keys, restrictions, loose_restrictions, resources,
sync, asynchronous, direct, retries, priority, fifo_timeout, actors,
**kwargs)    2664                     should_rejoin = False    2665             try:
-> 2666                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)    2667             finally:
2668                 for f in futures.values():

C:\ProgramData\Anaconda3\lib\site-packages\distributed\client.py in
gather(self, futures, errors, direct, asynchronous)    1965           
direct=direct,    1966                 local_worker=local_worker,
-> 1967                 asynchronous=asynchronous,    1968             )    1969 

C:\ProgramData\Anaconda3\lib\site-packages\distributed\client.py in
sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    814         else:
    815             return sync(
--> 816                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    817             )
    818 

C:\ProgramData\Anaconda3\lib\site-packages\distributed\utils.py in
sync(loop, func, callback_timeout, *args, **kwargs)
    345     if error[0]:
    346         typ, exc, tb = error[0]
--> 347         raise exc.with_traceback(tb)
    348     else:
    349         return result[0]

C:\ProgramData\Anaconda3\lib\site-packages\distributed\utils.py in f()
    329             if callback_timeout is not None:
    330                 future = asyncio.wait_for(future, callback_timeout)
--> 331             result[0] = yield future
    332         except Exception as exc:
    333             error[0] = sys.exc_info()

C:\ProgramData\Anaconda3\lib\site-packages\tornado\gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

C:\ProgramData\Anaconda3\lib\site-packages\distributed\client.py in
_gather(self, futures, errors, direct, local_worker)    1824                             exc = CancelledError(ke)    1825                         else:
-> 1826                             raise exception.with_traceback(traceback)    1827                        
raise exc    1828                     if errors == "skip":

/Applications/Anaconda/anaconda3/lib/python3.7/site-packages/dask/bytes/core.py
in read_block_from_file()

/Applications/Anaconda/anaconda3/lib/python3.7/site-packages/fsspec/core.py
in __enter__()

/Applications/Anaconda/anaconda3/lib/python3.7/site-packages/fsspec/spec.py
in open()

/Applications/Anaconda/anaconda3/lib/python3.7/site-packages/fsspec/implementations/local.py
in _open()

/Applications/Anaconda/anaconda3/lib/python3.7/site-packages/fsspec/implementations/local.py
in __init__()

/Applications/Anaconda/anaconda3/lib/python3.7/site-packages/fsspec/implementations/local.py
in _open()

FileNotFoundError: [Errno 2] No such file or directory:
'c:/Users/username/Python/data/origData/data-mtm-ss-wtt-2020-03-02-19-54-00.csv'

但是这很正常!好像远程工作者正在查看它的C:\ ???当其中一台其他计算机运行LocalCluster时,我发现了相同的错误,并且我执行了向其注册的工作程序,而在一个LocalCluster上注册了多个工作程序的计算机,则read_csv失败。

我有一个NAS,我可以在其上创建一个FTP服务器...不,无论是否有远程工作者,都会出现其他错误,必须绕开帐户登录。

%%time 
df = dd.read_csv('ftp://nas.local/PythonData/origData/*.csv') 
df = df.set_index("Date (UTC)") # expect almost 120seconds
df = df.drop_duplicates() ## HOURS
df = df.repartition(npartitions=600)
df.to_parquet(outpathfile)

然后等待大约3秒钟...

KilledWorker                              Traceback (most recent call
last) <timed exec> in <module>

C:\ProgramData\Anaconda3\lib\site-packages\dask\dataframe\core.py in
set_index(***failed resolving arguments***)    3615                
npartitions=npartitions,    3616                 divisions=divisions,
-> 3617                 **kwargs    3618             )    3619 

C:\ProgramData\Anaconda3\lib\site-packages\dask\dataframe\shuffle.py
in set_index(df, index, npartitions, shuffle, compute, drop, upsample,
divisions, partition_size, **kwargs)
     83         sizes, mins, maxes = base.optimize(sizes, mins, maxes)
     84         divisions, sizes, mins, maxes = base.compute(
---> 85             divisions, sizes, mins, maxes, optimize_graph=False
     86         )
     87         divisions = divisions.tolist()

C:\ProgramData\Anaconda3\lib\site-packages\dask\base.py in
compute(*args, **kwargs)
    442         postcomputes.append(x.__dask_postcompute__())
    443 
--> 444     results = schedule(dsk, keys, **kwargs)
    445     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    446 

C:\ProgramData\Anaconda3\lib\site-packages\distributed\client.py in
get(self, dsk, keys, restrictions, loose_restrictions, resources,
sync, asynchronous, direct, retries, priority, fifo_timeout, actors,
**kwargs)    2664                     should_rejoin = False    2665             try:
-> 2666                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)    2667             finally:
2668                 for f in futures.values():

C:\ProgramData\Anaconda3\lib\site-packages\distributed\client.py in
gather(self, futures, errors, direct, asynchronous)    1965           
direct=direct,    1966                 local_worker=local_worker,
-> 1967                 asynchronous=asynchronous,    1968             )    1969 

C:\ProgramData\Anaconda3\lib\site-packages\distributed\client.py in
sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    814         else:
    815             return sync(
--> 816                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    817             )
    818 

C:\ProgramData\Anaconda3\lib\site-packages\distributed\utils.py in
sync(loop, func, callback_timeout, *args, **kwargs)
    345     if error[0]:
    346         typ, exc, tb = error[0]
--> 347         raise exc.with_traceback(tb)
    348     else:
    349         return result[0]

C:\ProgramData\Anaconda3\lib\site-packages\distributed\utils.py in f()
    329             if callback_timeout is not None:
    330                 future = asyncio.wait_for(future, callback_timeout)
--> 331             result[0] = yield future
    332         except Exception as exc:
    333             error[0] = sys.exc_info()

C:\ProgramData\Anaconda3\lib\site-packages\tornado\gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

C:\ProgramData\Anaconda3\lib\site-packages\distributed\client.py in
_gather(self, futures, errors, direct, local_worker)    1824                             exc = CancelledError(key)    1825                         else:
-> 1826                             raise exception.with_traceback(traceback)    1827                        
raise exc    1828                     if errors == "skip":

KilledWorker:
("('from-delayed-pandas_read_text-read-block-getitem-63774c26477d7b369d337b87bd2c5520',
587)", <Worker 'tcp://192.168.1.152:62050', name: 0, memory: 0,
processing: 381>)

我这样做是为了娱乐,了解局限性,我非常希望有远程PC来帮助解决此问题,但是每次它读取CSV时,要弄懂的重点是它没有同时读取它们,然后在同一时间将它们全部保存在RAM中...我知道另外32GB的RAM只有300美元,但我正在尝试学习这样的限制。

我已经在Panda中编写过一次的程序。在以正确的单位显示正确的列之后,必须在此216m线数据集上运行滚动窗口。 Dask的分布式特性非常合适。并且此数据集仅是两个月数据集中的6个小时。我希望它能处理这个“小的”子集,以便我可以推断出更大更好的东西。它运行35分钟,然后将整个内存和页面文件吸干,并锁定系统。

理想情况下,dask已更新为仅允许本地工作人员读取本地文件?仅通过程序?我只有本地计算机,没有Web集群,没有Web大数据服务。只需一点QNAP NAS和几个新的Ryzen PC。周末在自我隔离中学习Python。

想法?出发!

我是数据科学/ python的新手,学习速度快,精通贸易,一心一意,我知道这是一个错误,但是可能有解决的办法,我将接受任何野心勃勃的想法。我实例化了一个...

python dask dask-distributed
1个回答
0
投票

看来您有两台机器:-Mac mini-PC

© www.soinside.com 2019 - 2024. All rights reserved.