Dask分布式:从HDFS读取.csv

问题描述 投票:3回答:2

我是使用"Distributed Pandas on a Cluster with Dask DataFrames"作为指南测试Dask的性能。

在马修的例子中,他有一个20GB的文件和64个工作人员(8个物理节点)。

就我而言,我有一个82GB的文件和288个工作人员(12个物理节点;每个节点都有一个HDFS数据节点)。

在所有12个节点上,我可以访问HDFS并执行一个显示文件信息的简单Python脚本:

import pyarrow as pa
fs = pa.hdfs.connect([url], 8022)
print(str(fs.info('/path/to/file.csv')))

如果我只使用运行Dask Scheduler的机器创建一个单节点集群(只有24个工作者),我可以从HDFS读取.csv并打印长度:

import dask
import dask.dataframe as dd
from dask.distributed import Client
client = Client()
dask.config.set(hdfs_backend='pyarrow')
df = dd.read_csv('hdfs://[url]:8022/path/to/file.csv')
df = client.persist(df)
print(str(len(df)))

最后一行给出了1046250873(很好!)并且需要3m17才能运行。

但是,当我使用完整集群时,调用len(df)的最后一行会死,我收到此错误:

KilledWorker :(“('pandas_read_text-read-block-from-delayed-9ad3beb62f0aea4a07005d5c98749d7e',1201)”,'tcp:// [url]:42866')

这类似于提到的问题here有一个解决方案here涉及Dask Yarn和配置(?)看起来像:worker_env={'ARROW_LIBHDFS_DIR': ...}

但是,我没有使用Yarn,虽然我的猜测是Dask Workers在某种程度上没有配置他们需要的HDFS / Arrow路径才能连接。

我没有看到任何关于这方面的文件,因此我的问题是我缺少什么。

编辑:

这是我在Dask Workers的输出中看到的错误回溯:

distributed.protocol.pickle - INFO - Failed to deserialize b'\x80\x04\x95N\x05\x00\x00\x00\x00\x00\x00(\x8c\x14dask.dataframe.utils\x94\x8c\ncheck_meta\x94\x93\x94(\x8c\x12dask
.compatibility\x94\x8c\x05apply\x94\x93\x94\x8c\x15dask.dataframe.io.csv\x94\x8c\x10pandas_read_text\x94\x93\x94]\x94(\x8c\x11pandas.io.parsers\x94\x8c\x08read_csv\x94\x93\x94(
\x8c\x0fdask.bytes.core\x94\x8c\x14read_block_from_file\x94\x93\x94h\r\x8c\x08OpenFile\x94\x93\x94(\x8c\x12dask.bytes.pyarrow\x94\x8c\x17PyArrowHadoopFileSystem\x94\x93\x94)\x8
1\x94}\x94(\x8c\x02fs\x94\x8c\x0cpyarrow.hdfs\x94\x8c\x10HadoopFileSystem\x94\x93\x94(\x8c\r10.255.200.91\x94MV\x1fNN\x8c\x07libhdfs\x94Nt\x94R\x94\x8c\x08protocol\x94\x8c\x04h
dfs\x94ub\x8c\x1a/path/to/file.csv\x94\x8c\x02rb\x94NNNt\x94R\x94K\x00J\x00\x90\xd0\x03C\x01\n\x94t\x94C\x12animal,weight,age\n\x94\x8c\x08builtins\x94\x8c\x04dict\x94
\x93\x94]\x94\x86\x94h*]\x94(]\x94(\x8c\x06animal\x94\x8c\x05numpy\x94\x8c\x05dtype\x94\x93\x94\x8c\x02O8\x94K\x00K\x01\x87\x94R\x94(K\x03\x8c\x01|\x94NNNJ\xff\xff\xff\xffJ\xff
\xff\xff\xffK?t\x94be]\x94(\x8c\x06weight\x94h2\x8c\x02i8\x94K\x00K\x01\x87\x94R\x94(K\x03\x8c\x01<\x94NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00t\x94be]\x94(\x8c\x03age\x94h<e
e\x86\x94]\x94(h/h9h@eeh*]\x94(]\x94(\x8c\x0cwrite_header\x94\x89e]\x94(\x8c\x07enforce\x94\x89e]\x94(\x8c\x04path\x94Nee\x86\x94t\x94\x8c\x11pandas.core.frame\x94\x8c\tDataFra
me\x94\x93\x94)\x81\x94}\x94(\x8c\x05_data\x94\x8c\x15pandas.core.internals\x94\x8c\x0cBlockManager\x94\x93\x94)\x81\x94(]\x94(\x8c\x18pandas.core.indexes.base\x94\x8c\n_new_In
dex\x94\x93\x94hW\x8c\x05Index\x94\x93\x94}\x94(\x8c\x04data\x94\x8c\x15numpy.core.multiarray\x94\x8c\x0c_reconstruct\x94\x93\x94h0\x8c\x07ndarray\x94\x93\x94K\x00\x85\x94C\x01
b\x94\x87\x94R\x94(K\x01K\x03\x85\x94h5\x89]\x94(h/h9h@et\x94b\x8c\x04name\x94Nu\x86\x94R\x94hY\x8c\x19pandas.core.indexes.range\x94\x8c\nRangeIndex\x94\x93\x94}\x94(hjN\x8c\x0
5start\x94K\x00\x8c\x04stop\x94K\x00\x8c\x04step\x94K\x01u\x86\x94R\x94e]\x94(h`hbK\x00\x85\x94hd\x87\x94R\x94(K\x01K\x02K\x00\x86\x94h<\x89C\x00\x94t\x94bh`hbK\x00\x85\x94hd\x
87\x94R\x94(K\x01K\x01K\x00\x86\x94h5\x89]\x94t\x94be]\x94(hYh[}\x94(h]h`hbK\x00\x85\x94hd\x87\x94R\x94(K\x01K\x02\x85\x94h5\x89]\x94(h9h@et\x94bhjNu\x86\x94R\x94hYh[}\x94(h]h`
hbK\x00\x85\x94hd\x87\x94R\x94(K\x01K\x01\x85\x94h5\x89]\x94h/at\x94bhjNu\x86\x94R\x94e}\x94\x8c\x060.14.1\x94}\x94(\x8c\x04axes\x94hV\x8c\x06blocks\x94]\x94(}\x94(\x8c\x06valu
es\x94hy\x8c\x08mgr_locs\x94h(\x8c\x05slice\x94\x93\x94K\x01K\x03K\x01\x87\x94R\x94u}\x94(h\x9dh\x7fh\x9eh\xa0K\x00K\x01K\x01\x87\x94R\x94ueust\x94b\x8c\x04_typ\x94\x8c\tdatafr
ame\x94\x8c\t_metadata\x94]\x94ub\x8c\x0cfrom_delayed\x94t\x94.'
Traceback (most recent call last):
  File "/usr/lib/python3.6/site-packages/distributed/protocol/pickle.py", line 59, in loads
    return pickle.loads(x)
  File "/usr/lib64/python3.6/site-packages/pyarrow/hdfs.py", line 38, in __init__
    self._connect(host, port, user, kerb_ticket, driver, extra_conf)
  File "pyarrow/io-hdfs.pxi", line 89, in pyarrow.lib.HadoopFileSystem._connect
  File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
pyarrow.lib.ArrowIOError: Unable to load libjvm

同样,我可以使用pyarrow从12个节点中的任何节点读取成功从HDFS读取的文件。

python hdfs dask dask-distributed
2个回答
0
投票

查看回溯,我的猜测是PyArrow未正确安装在工作节点上。我可能会在PyArrow问题跟踪器上询问他们是否可以帮助您诊断该追溯。


0
投票

何男孩!在从头开始构建libhdfs3并部署到集群的一部分并找到相同的确切结果(ImportError: Can not find the shared library: libhdfs3.so)后,我意识到问题是我一直在通过pssh启动Dask工作程序,因此他们没有捕获应该使用的环境变量。

© www.soinside.com 2019 - 2024. All rights reserved.