我是数据科学/ python的新手,学习速度快,是行业工程师,内心极客,我知道这是一个错误,但是可能有解决的办法,我将接受任何野心勃勃的想法。
我已经在本地计算机上实例化了Dask LocalCluster和Client:具有32GB RAM的Ryzen 3800X。
from dask.distributed import Client, LocalCluster
daskcluster = LocalCluster(host='0.0.0.0')
daskclient = Client(daskcluster)
daskclient
计划程序:tcp://192.168.1.152:62020
然后我试图读取数据集。Panda和Dask可以很高兴地读取446x * .csv文件的25GB目录。 (处理时间)
%%time
df = dd.read_csv(origPathFile) # Yeah, a while.
df = df.set_index("Date (UTC)") # expect almost 5 minutes
df = df.drop_duplicates() ## HOURS
df = df.repartition(npartitions=600) ## new - yet to time
df.to_parquet(outpathfile) #this is the line which commits and computes the above.
[当我让这4个工人继续前进时,8个核心,16个线程,太好了,没问题。但是我这样做是为了学习对吗?我有一个配备8GB的Mac Mini和另一个配备32GB RAM的Ryzen3600。 RAM似乎是我的瓶颈。
[当我在另一个Ryzen或Mac Mini的Terminal上启动Anaconda Prompts时,执行一个工作人员加入竞争,]
dask-worker -memory-limit 10GB 192.168.1.152:62020
我收到错误消息:
FileNotFoundError Traceback (most recent call last) <timed exec> in <module> C:\ProgramData\Anaconda3\lib\site-packages\dask\dataframe\core.py in set_index(***failed resolving arguments***) 3615 npartitions=npartitions, 3616 divisions=divisions, -> 3617 **kwargs 3618 ) 3619 C:\ProgramData\Anaconda3\lib\site-packages\dask\dataframe\shuffle.py in set_index(df, index, npartitions, shuffle, compute, drop, upsample, divisions, partition_size, **kwargs) 83 sizes, mins, maxes = base.optimize(sizes, mins, maxes) 84 divisions, sizes, mins, maxes = base.compute( ---> 85 divisions, sizes, mins, maxes, optimize_graph=False 86 ) 87 divisions = divisions.tolist() C:\ProgramData\Anaconda3\lib\site-packages\dask\base.py in compute(*args, **kwargs) 442 postcomputes.append(x.__dask_postcompute__()) 443 --> 444 results = schedule(dsk, keys, **kwargs) 445 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)]) 446 C:\ProgramData\Anaconda3\lib\site-packages\distributed\client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs) 2664 should_rejoin = False 2665 try: -> 2666 results = self.gather(packed, asynchronous=asynchronous, direct=direct) 2667 finally: 2668 for f in futures.values(): C:\ProgramData\Anaconda3\lib\site-packages\distributed\client.py in gather(self, futures, errors, direct, asynchronous) 1965 direct=direct, 1966 local_worker=local_worker, -> 1967 asynchronous=asynchronous, 1968 ) 1969 C:\ProgramData\Anaconda3\lib\site-packages\distributed\client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs) 814 else: 815 return sync( --> 816 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs 817 ) 818 C:\ProgramData\Anaconda3\lib\site-packages\distributed\utils.py in sync(loop, func, callback_timeout, *args, **kwargs) 345 if error[0]: 346 typ, exc, tb = error[0] --> 347 raise exc.with_traceback(tb) 348 else: 349 return result[0] C:\ProgramData\Anaconda3\lib\site-packages\distributed\utils.py in f() 329 if callback_timeout is not None: 330 future = asyncio.wait_for(future, callback_timeout) --> 331 result[0] = yield future 332 except Exception as exc: 333 error[0] = sys.exc_info() C:\ProgramData\Anaconda3\lib\site-packages\tornado\gen.py in run(self) 733 734 try: --> 735 value = future.result() 736 except Exception: 737 exc_info = sys.exc_info() C:\ProgramData\Anaconda3\lib\site-packages\distributed\client.py in _gather(self, futures, errors, direct, local_worker) 1824 exc = CancelledError(ke) 1825 else: -> 1826 raise exception.with_traceback(traceback) 1827 raise exc 1828 if errors == "skip": /Applications/Anaconda/anaconda3/lib/python3.7/site-packages/dask/bytes/core.py in read_block_from_file() /Applications/Anaconda/anaconda3/lib/python3.7/site-packages/fsspec/core.py in __enter__() /Applications/Anaconda/anaconda3/lib/python3.7/site-packages/fsspec/spec.py in open() /Applications/Anaconda/anaconda3/lib/python3.7/site-packages/fsspec/implementations/local.py in _open() /Applications/Anaconda/anaconda3/lib/python3.7/site-packages/fsspec/implementations/local.py in __init__() /Applications/Anaconda/anaconda3/lib/python3.7/site-packages/fsspec/implementations/local.py in _open() FileNotFoundError: [Errno 2] No such file or directory: 'c:/Users/username/Python/data/origData/data-mtm-ss-wtt-2020-03-02-19-54-00.csv'
但是这很正常!好像远程工作者正在查看它的C:\ ???当其中一台其他计算机运行LocalCluster时,我发现了相同的错误,并且我执行了向其注册的工作程序,而在一个LocalCluster上注册了多个工作程序的计算机,则read_csv失败。
我有一个NAS,我可以在其上创建一个FTP服务器...不,无论是否有远程工作者,都会出现其他错误,必须绕开帐户登录。
%%time df = dd.read_csv('ftp://nas.local/PythonData/origData/*.csv') df = df.set_index("Date (UTC)") # expect almost 120seconds df = df.drop_duplicates() ## HOURS df = df.repartition(npartitions=600) df.to_parquet(outpathfile)
然后等待大约3秒钟...
KilledWorker Traceback (most recent call last) <timed exec> in <module> C:\ProgramData\Anaconda3\lib\site-packages\dask\dataframe\core.py in set_index(***failed resolving arguments***) 3615 npartitions=npartitions, 3616 divisions=divisions, -> 3617 **kwargs 3618 ) 3619 C:\ProgramData\Anaconda3\lib\site-packages\dask\dataframe\shuffle.py in set_index(df, index, npartitions, shuffle, compute, drop, upsample, divisions, partition_size, **kwargs) 83 sizes, mins, maxes = base.optimize(sizes, mins, maxes) 84 divisions, sizes, mins, maxes = base.compute( ---> 85 divisions, sizes, mins, maxes, optimize_graph=False 86 ) 87 divisions = divisions.tolist() C:\ProgramData\Anaconda3\lib\site-packages\dask\base.py in compute(*args, **kwargs) 442 postcomputes.append(x.__dask_postcompute__()) 443 --> 444 results = schedule(dsk, keys, **kwargs) 445 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)]) 446 C:\ProgramData\Anaconda3\lib\site-packages\distributed\client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs) 2664 should_rejoin = False 2665 try: -> 2666 results = self.gather(packed, asynchronous=asynchronous, direct=direct) 2667 finally: 2668 for f in futures.values(): C:\ProgramData\Anaconda3\lib\site-packages\distributed\client.py in gather(self, futures, errors, direct, asynchronous) 1965 direct=direct, 1966 local_worker=local_worker, -> 1967 asynchronous=asynchronous, 1968 ) 1969 C:\ProgramData\Anaconda3\lib\site-packages\distributed\client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs) 814 else: 815 return sync( --> 816 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs 817 ) 818 C:\ProgramData\Anaconda3\lib\site-packages\distributed\utils.py in sync(loop, func, callback_timeout, *args, **kwargs) 345 if error[0]: 346 typ, exc, tb = error[0] --> 347 raise exc.with_traceback(tb) 348 else: 349 return result[0] C:\ProgramData\Anaconda3\lib\site-packages\distributed\utils.py in f() 329 if callback_timeout is not None: 330 future = asyncio.wait_for(future, callback_timeout) --> 331 result[0] = yield future 332 except Exception as exc: 333 error[0] = sys.exc_info() C:\ProgramData\Anaconda3\lib\site-packages\tornado\gen.py in run(self) 733 734 try: --> 735 value = future.result() 736 except Exception: 737 exc_info = sys.exc_info() C:\ProgramData\Anaconda3\lib\site-packages\distributed\client.py in _gather(self, futures, errors, direct, local_worker) 1824 exc = CancelledError(key) 1825 else: -> 1826 raise exception.with_traceback(traceback) 1827 raise exc 1828 if errors == "skip": KilledWorker: ("('from-delayed-pandas_read_text-read-block-getitem-63774c26477d7b369d337b87bd2c5520', 587)", <Worker 'tcp://192.168.1.152:62050', name: 0, memory: 0, processing: 381>)
我这样做是为了娱乐,了解局限性,我非常希望有远程PC来帮助解决此问题,但是每次它读取CSV时,要弄懂的重点是它没有同时读取它们,然后在同一时间将它们全部保存在RAM中...我知道另外32GB的RAM只有300美元,但我正在尝试学习这样的限制。
我已经在Panda中编写过一次的程序。在以正确的单位显示正确的列之后,必须在此216m线数据集上运行滚动窗口。 Dask的分布式特性非常合适。并且此数据集仅是两个月数据集中的6个小时。我希望它能处理这个“小的”子集,以便我可以推断出更大更好的东西。它运行35分钟,然后将整个内存和页面文件吸干,并锁定系统。
理想情况下,dask已更新为仅允许本地工作人员读取本地文件?仅通过程序?我只有本地计算机,没有Web集群,没有Web大数据服务。只需一点QNAP NAS和几个新的Ryzen PC。周末在自我隔离中学习Python。
想法?出发!
我是数据科学/ python的新手,学习速度快,精通贸易,一心一意,我知道这是一个错误,但是可能有解决的办法,我将接受任何野心勃勃的想法。我实例化了一个...
看来您有两台机器:-Mac mini-PC