我正在使用 python 处理经典的数据处理工作流程,即加载大量文件、预处理它们、连接它们并应用一些缩减。
目前,我正在使用 dask 的两阶段工作流程:
Bag
来加载和预处理所有输入文件,并将生成的 xarray DataSet
(尺寸为 (time, lat, lon, fileidx)
)作为 zarr 文件导出到磁盘。xarray.open_mfdataset
打开所有之前的 zarr 文件,将它们沿 fileidx
维度连接起来并应用最终的缩减。以这种方式一切都运行良好,并且并行化很好地适用于这两个步骤。
我天真地尝试通过直接在第一步的输出
xarray.concat
上使用DataSet
函数而不是zarr写入/读取步骤,将这个两步工作流程简化为单个步骤,但在我的所有尝试中xarray.concat
使用单个工作程序合并所有内容,从而由于内存问题导致我的计算崩溃。
那么你知道
xarray.concat
和xarray.open_mfdataset
串联之间的区别吗?或者如果可能的话如何设法做到这一点?对于小数据,结果(至少看起来)是相同的。
这就是我目前写入和读取 zarr 文件的方式:
# Write
import dask.bag as db
my_data = db.from_sequence(input_files)
.map(preprocessing)
.map(to_dataset)
.map(lambda x: x.chunk(None))
my_data.map(lambda x: x.to_zarr(f"{ZARR_STORE_PATH}/part_{x.fileidx.data.item():02}.zarr")).compute()
# Read
xr.open_mfdataset(
Path(ZARR_STORE_PATH).glob("part_*.zarr"),
engine="zarr",
combine="nested",
concat_dim="fileidx",
parallel=True,
)
我尝试过一些代码:
xr.concat(my_data, dim="fileidx")
xr.concat(my_data.to_delayed(), dim="fileidx")
dask.delayed(lambda x: xr.concat(x, dim="fileidx"))(my_data)
以下两个选项是独立的
在创建整个数据集时,您可能需要第二个。另外,虽然你可以在延迟内创建这样的东西,但更正常的是从客户端调用这些“集合”API函数,否则事情可能会出错。
在
concat
上,这将创建所有数据集的惰性海量数组;所以如果你对它进行操作,它要么加载整个数据(没有 chunks
),要么使用 dask 逐块处理整个数据。否则,客户端中的 concat
应该与 open_mfdataset
非常相似。