使用少量控制点(~200)的 RBF 对大量点(~150k)进行高效重复插值

问题描述 投票:0回答:1

我正在尝试解决最小二乘问题,其中我的自变量是数百个控制点中每个控制点的几个值,然后通过 RBFInterpolator(与邻居)将这些值插值到大量其他点<= 10), and then those interpolated values are used to compute the residuals - each point has some target values and the interpolated variables are used to generate predictions, from which I get residuals. These points are just coordinates in space, they are not on a regular grid or anything like that. The control points are a subset of the points.

用于最小二乘优化器的残差函数由工厂函数生成,看起来像这样(简化):

def create_global_residual_function(
    model: Callable[[np.ndarray, float, ...], np.ndarray],
    control_points: np.ndarray,
    points: np.ndarray,
    y: np.ndarray,
    x: np.ndarray,
    thin_plate_spline_smoothing: float,
    thin_plate_spline_neighbors: int,
    thin_plate_spline_degree: int,
    residual_boost_factor: float = 2.0,
) -> Callable[[Tuple[float, ...]], np.ndarray]:
    
    num_control_points = control_points.shape[0]
    num_points = points.shape[0]

    x = x.reshape(1, len(x))

    tps = RBFInterpolator(
        control_points,
        np.zeros((num_control_points, 5), dtype=float),
        smoothing = thin_plate_spline_smoothing,
        neighbors = thin_plate_spline_neighbors,
        kernel = "thin_plate_spline",
        degree = thin_plate_spline_degree
    )

    def global_residual_function(params: Tuple[float, ...]) -> np.ndarray:
        tps.d = np.array(params).reshape(num_control_points, <number of parameters>)
        model_parameters = tps(points)
        x0 = model_parameters[:, 0].reshape(num_points, 1)
        x1 = model_parameters[:, 1].reshape(num_points, 1)
        <and so on ...>
        y_hat = model(x, x0, x1, <and so on...>)
        return (y_hat - y).flatten()

    return global_residual_function

在每次迭代中,我更改的只是 RBFInterpolator 的

d
属性。我每次插值的点都是相同的。在模型收敛之前,插值必须发生大约 100 到 1000 次。当输入点不变时,重复调用 RBFInterpolator.call 对我来说似乎很愚蠢。我确信有大量不必要的重复计算来找到每个点最近的 X 控制点、计算距离并输入内核等。理论上应该有一个可以计算和的单个(稀疏)矩阵然后重复用于根据控制点上的标量数组的值计算点上的标量数组的值。

我的解决方案有效 - 我已经在小型测试用例(约 1000 个点和约 20 个控制点)上对其进行了测试,并从优化中获得了正确的结果。问题是它的效率非常低,并且不能很好地扩展到更大的问题。

scipy 中是否有更有效的解决方案?也许有某种方法可以提取我上面提到的(稀疏)矩阵?或者我应该为此应用程序使用不同的插值器类?

python scipy interpolation
1个回答
0
投票

是的,考虑到这些限制,重写 RBFInterpolator 可以获得大约 50% 的加速。

在编写任何代码之前,最好检查一下您是否正在优化正确的内容。我尝试的第一步是使用 line_profiler 来检查

__call__()
花费了多少时间来做可以通过缓存最近点来避免的事情。尽管它使用 KD-Tree 来加速对原始输入空间的搜索,但它仍然花费大约 50% 的时间搜索附近的点,对这些点进行排序,并对具有相同邻居的点进行重复数据删除。

我首先对 RBFInterpolator 进行子类化。我们需要重写两个方法:

__init__
__call__
。这使得我们能够重用大部分 RBFInterpolator。我首先从 SciPy 复制实现

接下来,我更改了 X、Y 和 D 的提供方式。 (注意:我使用 X、Y 和 D 的方式与 SciPy 文档 相同。)最初,代码假设您在创建 RBFInterpolator 时提供了 Y 和 D,并且您得到了一个 RBFInterpolator可以在任何 X 处进行评估。我对此进行了更改,以便它使用固定的 X 和 Y,并允许您更改 D。

接下来,我寻找如果 X 和 Y 固定的话可以避免的计算。我发现我可以避免每次重新计算

xindices
yindices
inv

import numpy as np
from scipy.interpolate import RBFInterpolator, _rbfinterp
import timeit


class RBFInterpolatorWithDynamicD(RBFInterpolator):
    def __init__(self, y, x, d_shape, *args, **kwargs):
        self.x = np.asarray(x, dtype=float, order="C")
        d = np.zeros(d_shape)
        ny, ndim = y.shape
        assert d.size % ny == 0, "d and y must have same first axis size"
        self.d_size_inner = d.size // ny
        super().__init__(y, d, *args, **kwargs)
        self.precalculate_derived_values()


    def precalculate_derived_values(self):
        # Get the indices of the k nearest observation points to each
        # evaluation point.
        _, yindices = self._tree.query(self.x, self.neighbors)
        if self.neighbors == 1:
            # `KDTree` squeezes the output when neighbors=1.
            yindices = yindices[:, None]

        # Multiple evaluation points may have the same neighborhood of
        # observation points. Make the neighborhoods unique so that we only
        # compute the interpolation coefficients once for each
        # neighborhood.
        yindices = np.sort(yindices, axis=1)
        yindices, inv = np.unique(yindices, return_inverse=True, axis=0)
        inv = np.reshape(inv, (-1,))  # flatten, we need 1-D indices

        # `inv` tells us which neighborhood will be used by each evaluation
        # point. Now we find which evaluation points will be using each
        # neighborhood.
        xindices = [[] for _ in range(len(yindices))]
        for i, j in enumerate(inv):
            xindices[j].append(i)

        self.yindices = yindices
        self.inv = inv
        self.xindices = xindices

    def __call__(self, d):
        """Evaluate the interpolant at `self.x`, with `d` values corresponding
        to original points at `self.y`."""
        ny, ndim = self.y.shape
        assert d.shape == (ny, *self.d_shape), f"Expecting D shape of {(ny, *self.d_shape)}"
        d = d.reshape((ny, -1))
        if self.x.ndim != 2:
            raise ValueError("`x` must be a 2-dimensional array.")

        nx, ndim = self.x.shape
        if ndim != self.y.shape[1]:
            raise ValueError("Expected the second axis of `x` to have length "
                             f"{self.y.shape[1]}.")

        # Our memory budget for storing RBF coefficients is
        # based on how many floats in memory we already occupy
        # If this number is below 1e6 we just use 1e6
        # This memory budget is used to decide how we chunk
        # the inputs
        memory_budget = max(self.x.size + self.y.size + d.size, 1000000)

        if self.neighbors is None:
            raise NotImplemented("self.neighbors is None is not implemented, use RBFInterpolator")
        else:
            yindices = self.yindices
            inv = self.inv
            xindices = self.xindices

            out = np.empty((nx, self.d_size_inner), dtype=float)
            for xidx, yidx in zip(xindices, yindices):
                # `yidx` are the indices of the observations in this
                # neighborhood. `xidx` are the indices of the evaluation points
                # that are using this neighborhood.
                xnbr = self.x[xidx]
                ynbr = self.y[yidx]
                dnbr = d[yidx]
                snbr = self.smoothing[yidx]
                shift, scale, coeffs = _rbfinterp._build_and_solve_system(
                    ynbr,
                    dnbr,
                    snbr,
                    self.kernel,
                    self.epsilon,
                    self.powers,
                )
                out[xidx] = self._chunk_evaluator(
                    xnbr,
                    ynbr,
                    shift,
                    scale,
                    coeffs,
                    memory_budget=memory_budget)

        out = out.astype(d.dtype)
        out = out.reshape((nx, ) + self.d_shape)
        return out


def main():
    # Create example dataset with 200 initial points
    Y = np.random.uniform(-1, 1, size=(200, 2))
    D = np.sum(Y, axis=1)*np.exp(-6*np.sum(Y**2, axis=1))
    # Infer the output on 150000 points
    X = np.random.uniform(-1, 1, size=(150000, 2))
    interp = RBFInterpolator(Y, D, neighbors=10)
    interp2 = RBFInterpolatorWithDynamicD(Y, X, d_shape=D.shape, neighbors=10)
    print("equal?", np.allclose(interp(X), interp2(D)))
    num_runs = 10
    print("classic", timeit.timeit('interp(X)', number=num_runs, globals={'interp': interp, 'X': X}) / num_runs)
    print("optimized", timeit.timeit('interp2(D)', number=num_runs, globals={'interp2': interp2, 'D': D}) / num_runs)


main()

该程序测试它是否获得相同的结果,并对其进行基准测试。

警告:该程序使用 SciPy API 的私有部分。这意味着 SciPy 的未来版本可能会更改此答案所依赖的库的部分内容。这可以通过重新实现 RBFInterpolator 的其余部分来避免。

© www.soinside.com 2019 - 2024. All rights reserved.