使用dask数组时xarray.open_mfdataset(parallel=True)和xarray.concat有什么区别?

问题描述 投票:0回答:1

我正在使用 python 处理经典的数据处理工作流程,即加载大量文件、预处理它们、连接它们并应用一些缩减。

目前,我正在使用 dask 的两阶段工作流程:

  1. 我创建了一个 dask
    Bag
    来加载和预处理所有输入文件,并将生成的 xarray
    DataSet
    (尺寸为
    (time, lat, lon, fileidx)
    )作为 zarr 文件导出到磁盘。
  2. 我使用
    xarray.open_mfdataset
    打开所有之前的 zarr 文件,将它们沿
    fileidx
    维度连接起来并应用最终的缩减。

以这种方式一切都运行良好,并且并行化很好地适用于这两个步骤。

我天真地尝试通过直接在第一步的输出

xarray.concat
上使用
DataSet
函数而不是zarr写入/读取步骤,将这个两步工作流程简化为单个步骤,但在我的所有尝试中
xarray.concat
使用单个工作程序合并所有内容,从而由于内存问题导致我的计算崩溃。

那么你知道

xarray.concat
xarray.open_mfdataset
串联之间的区别吗?或者如果可能的话如何设法做到这一点?对于小数据,结果(至少看起来)是相同的。

这就是我目前写入和读取 zarr 文件的方式:

# Write
import dask.bag as db

my_data = db.from_sequence(input_files)
            .map(preprocessing)
            .map(to_dataset)
            .map(lambda x: x.chunk(None))
my_data.map(lambda x: x.to_zarr(f"{ZARR_STORE_PATH}/part_{x.fileidx.data.item():02}.zarr")).compute()

# Read
xr.open_mfdataset(
    Path(ZARR_STORE_PATH).glob("part_*.zarr"),
    engine="zarr",
    combine="nested",
    concat_dim="fileidx",
    parallel=True,
)

我尝试过一些代码:

xr.concat(my_data, dim="fileidx")
xr.concat(my_data.to_delayed(), dim="fileidx")
dask.delayed(lambda x: xr.concat(x, dim="fileidx"))(my_data)

这里还有一个讨论:https://github.com/pydata/xarray/issues/4628

python dask python-xarray
1个回答
0
投票

以下两个选项是独立的

  • open_mfdataset(parallel=True) 并行打开并读取所有输入数据集中的元数据
  • chunks={} (或类似)生成 dask.arrays 作为每次读取的支持对象

在创建整个数据集时,您可能需要第二个。另外,虽然你可以在延迟内创建这样的东西,但更正常的是从客户端调用这些“集合”API函数,否则事情可能会出错。

concat
上,这将创建所有数据集的惰性海量数组;所以如果你对它进行操作,它要么加载整个数据(没有
chunks
),要么使用 dask 逐块处理整个数据。否则,客户端中的
concat
应该与
open_mfdataset
非常相似。

© www.soinside.com 2019 - 2024. All rights reserved.