我正在尝试使用Dask在本地处理单个大(1TB)json文件。该文件每行有一个对象。当我未在read_text
函数中指定块大小时,代码可以完美运行,但只能在一个工作程序上运行。然后仅创建一个分区,并且在仪表板中只能看到一个任务。如果我确实指定blocksize
,则所有工作人员都会获得任务,但他们永远不会继续进行处理(至少不是在12小时之内)。怎么了?我如何让所有工人真正地工作?
代码如下:
import dask.bag as db
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=4,
threads_per_worker=2,
memory_limit='2GB')
client = Client(cluster)
db.read_text('./data/uncompressed/latest-all.json', blocksize=1e8)\
.map(lambda obj: obj[:-2])\
.map(parse_json)\
.map(prune)\
.filter(lambda obj: obj != None)\
.map(json.dumps)\
.to_textfiles('./data/proc/*.json')
parse_json
和prune
都是纯Python函数,没有IO。
我的第一个猜测是它们正在工作,但是您的功能非常慢。您可以通过...]查看您的工人在做什么
您也可以考虑使用较小的块,以查看是否有助于使事情更快地移动。