读取文件csv并与多个worker、dask.distributed、dask.dataframe进行聚合

问题描述 投票:0回答:1

我有服务器 IP:192.168.33.10 启动 schudeler

dask scheduler --host 0.0.0.0
这是该服务器中的主机,我有文件“/var/shared/job_skills.csv”,工作人员是 192.168.33.11,192.168.33.12 使用此 cmd 启动
dask worker 192.168.33.10:8786  --local-directory /var/test --dashboard-address 8787 --host 0.0.0.0 --worker-port 39040 --nanny-port 39042
我想在master中启动脚本read_csv.py并将任务分发给worker,比如分块数据并进行聚合,每个worker返回结果并打印结果。

我想做这样的 主脚本“read_csv.py”中的这个脚本

import dask
import dask.dataframe as dd
from dask.distributed import Client

dask.config.set({"dataframe.convert-string": False})

client = Client("192.168.33.10:8786")
df = dd.read_csv("/var/shared/foo.csv")

df['job_skills'] = df['job_skills'].fillna('')
df = df["job_skills"].str.split(',').explode().str.strip()

grouped = df.value_counts().compute()
print(grouped)

工人们给我这样的:

2024-02-29 14:30:04,180 - distributed.worker - WARNING - Compute Failed
Key:       ('str-strip-956897fad2adeffa85aa604734f0febb', 0)
Function:  execute_task
args:      ((subgraph_callable-50304a7f18bfe19fd3ff56b4a6d6db4f, 'str', 'strip', (), {}, 'explode-e2622e44a85e1396024ff799e4f97b6e', 'split', {'pat': ',', 'n': -1, 'expand': False}, 'getitem-f2b8974c7433cce59ce3453d7f35940e', 'job_skills', '', 'getitem-b9a23a03a236420b7c31ada8ec6055df', [(<function read_block_from_file at 0x7fd18829f920>, <OpenFile '/var/shared/foo.csv'>, 0, 1398, b'\n'), None, True, True]))
kwargs:    {}
Exception: "FileNotFoundError(2, 'No such file or directory')"
 


我该如何解决这个问题?

python dataframe dask dask-distributed dask-dataframe
1个回答
0
投票

解决此问题的一种方法,无需将文件复制给工作人员。

  • 确保托管文件的计算机正在运行 sshd(即,您可以从工作计算机通过 ssh 访问它)
  • 客户端和工作人员的 ssh 访问是通过密钥而不是用户名/密码(您需要任何连接都是非交互式的)
  • 使您的读取命令类似于
df = dd.read_csv("ssh://server:port//var/shared/foo.csv")

现在每个工作人员将直接从源中读取,并且仅读取将要处理的那些字节。

其他人可能的替代方案:

  • 挂载 NFS 等共享文件系统,使文件在工作人员和客户端看来就像本地文件
  • 将文件存储在对象存储中,而不是普通的磁盘位置。该对象存储仍然可以是本地的,例如提供 S3 兼容 API 的 minio。
© www.soinside.com 2019 - 2024. All rights reserved.