Xarray/Dask:map_block 计算 - 变量大小不一致

问题描述 投票:0回答:1

有关 Python 代码的可行示例:

import numpy as np
import xarray as xr
from scipy.optimize import curve_fit
import pandas as pd
import xesmf as xe

from dask.distributed import Client

def diurnal_cycle_model(hours, intercept, coef_cos, coef_sin):
    return intercept + coef_cos * np.cos(2 * np.pi * hours / 24) + coef_sin * np.sin(2 * np.pi * hours / 24)

def harm_fit(data):
    data = data.fillna(0)
    ndim = len(data.dims)
    hours = [3,9,15,21]
    
    params = xr.apply_ufunc(curve_fit, diurnal_cycle_model, hours, data,
                            input_core_dims=[[], ['step'], ['step']],
                            output_core_dims=[['param'], ['param1', 'param2']],
                            vectorize=True, dask='parallelized', output_dtypes=[float, float])
    
    intercept, coef_cos, coef_sin = params[0].isel(param=0), params[0].isel(param=1), params[0].isel(param=2)

    hours_reconstructed = xr.DataArray(data=np.arange(0,24),dims=['step'],coords={'step':pd.timedelta_range(start='0 hour',end='23 hours',freq='1H')})
    dc_reconstructed = diurnal_cycle_model(hours_reconstructed.expand_dims(dim={'time':intercept.time,'number':intercept.number,'lat':intercept.lat,'lon':intercept.lon},axis=(1,2,3,4)),
                                           intercept,
                                           coef_cos,
                                           coef_sin)
    return dc_reconstructed

if __name__ == '__main__':
    s2s_fcst = xr.DataArray(data=np.random.normal(size=(21,21,4,21,47)),
                            dims=['time','number','step','lat','lon'],
                            coords=dict(time=pd.date_range(start='2017-10-02',periods=21),
                                        step=pd.timedelta_range(start='3h',end='21h',freq='6h'),
                                        number=np.arange(0,21),
                                        lat=np.arange(15,-16.5,-1.5),
                                        lon=np.arange(90,160.5,1.5)),
                            name='tp')
    s2s_fit = xr.DataArray(dims=['time','number','step','lat','lon'],
                           coords=dict(time=s2s_fcst.time,number=s2s_fcst.number,
                                       step=pd.timedelta_range(start='0 day',end='46 day',freq='1 h'),
                                       lat=s2s_fcst.lat,lon=s2s_fcst.lon),
                           name='tp_fit')
    
    client = Client(n_workers=4)
    
    for l in range(0,2): #range(0,46):
        print('Lead '+str(l))
        rain_6hr = s2s_fcst[:,:,4*l:4*l+4,:,:]
        
        reshaped_ds = rain_6hr.transpose('step','time','number','lat','lon')
        reshaped_ds_chunk = reshaped_ds.chunk({'time': 3,'number':3})
        print(reshaped_ds_chunk)
        template = (xr.DataArray(data=np.zeros(shape=(24,21,21,21,47)),
                                dims=['step','time','number','lat','lon'],
                                coords=dict(time=s2s_fcst.time,number=s2s_fcst.number,
                                            step=pd.timedelta_range(start='0 hour',end='23 hours',freq='1H'),
                                            lat=s2s_fcst.lat,lon=s2s_fcst.lon))
                      .chunk({'time': 3,'number':3}))
        print(template)
        dc_harm_fit = reshaped_ds_chunk.map_blocks(harm_fit, template=template).compute(scheduler=client)
        print(dc_harm_fit)
        dc_harm_fit_transpose = dc_harm_fit.transpose('time','number','step','lat','lon')
        print(dc_harm_fit_transpose)

对于此代码,我尝试将预测降雨数据拟合到定义为

diurnal_cycle_model(hours, intercept, coef_cos, coef_sin)
的昼夜周期模型。

  1. 数据结构:
    s2s_fcst
    是预测降雨量数据,时间步长为6小时(
    step
    维度)。它具有预测初始时间
    time
    、预测时间步长
    step
    、集合成员
    number
    以及纬度/经度。这里用随机数代替数据进行演示。
  2. 我试图在每个预测日的
    step
    维度上拟合昼夜周期,即第一个预测日(循环
    l
    )包括
    step
    的0-3个时间步长,第二个预测日包括4个-第 7 个时间步长
    step
  3. harm_fit
    需要很长的计算时间。因此,我将数据分块并将其输入到
    map_blocks(harm_fit)
  4. harm_fit
    中,我们对
    diurnal_cycle_model
    上的4个预测时间步进行回归并获得回归系数。然后我们重建每小时降雨量(24 个时间步长,频率为 1 小时)。因此,
    harm_fit
    之后的输出数据具有24步的
    step
    ,而不是4步的输入。
    template
    map_block
    的步长尺寸为 24 小时。
  5. compute()
    之后,出现错误信息如下:

发生异常:ValueError
替换数据必须与变量的形状匹配。替换数据的形状为 (3, 21, 147, 47, 24);变量的形状为 (24, 21, 21, 21, 47)
文件“/Users/waynetsai/Desktop/test.py”,第 62 行,位于

dc_harm_fit = reshaped_ds_chunk.map_blocks(harm_fit, template=template).compute(scheduler=client)

我该如何解决这个问题?代码有什么问题吗?

感谢您的协助!

python parallel-processing dask python-xarray
1个回答
0
投票

“harm_fit”函数的输出与您提供的“map_blocks”模板之间的形状不匹配是问题的根源。虽然变量(模板)的形状为 (24, 21, 21, 21, 47),但错误表明替换数据(harm_fit 的输出)的形状为 (3, 21, 147, 47, 24)。

这种形状不匹配很可能是由于输入块大小(3 个步骤)和“harm_fit”的输出(24 个步骤)具有不同的步骤尺寸大小而引起的。使用“map_blocks”时,输出似乎具有不同的步长尺寸大小,即使该函数应该生成与输入块的大小匹配的块。

确保“输入块”大小和“harm_fit”输出具有相同的步长尺寸大小对于纠正此问题是必要的。可以将“harm_fit”函数更改为仅返回完成此操作的相关步骤,或者可以修改“harm_fit”中的逻辑和模板以考虑各种步长。

我希望这有帮助。

© www.soinside.com 2019 - 2024. All rights reserved.